Skip to content

Core: Add LockManager to HadoopTableOperations#3663

Merged
rdblue merged 29 commits into
apache:masterfrom
CodingCat:hadoop_concurrent_1
Dec 30, 2021
Merged

Core: Add LockManager to HadoopTableOperations#3663
rdblue merged 29 commits into
apache:masterfrom
CodingCat:hadoop_concurrent_1

Conversation

@CodingCat
Copy link
Copy Markdown
Contributor

@CodingCat CodingCat commented Dec 3, 2021

this PR address the data loss issue when there are multiple threads appending to HadoopTable within the same process.

What we observed

when we work on the migration from Delta Lake to Iceberg, we found one of the test cases where we concurrently append data to the table with Spark failed with some data loss and/or writer job abort.

Specifically, we found when there are , e.g. 10 threads append at the same time, we eventually only get any number of versions of table between [1, 10] (with some instrumentation, I saw that multiple threads eventually committed the same version )

Root Cause Analysis

with some analysis, we found the root cause is on the current mechanism to check version conflict. Currently

(1) HadoopTableOperations checks the existence of target metadata file path , if exists , then throw CommitFailedException
(2) if (1) passed, HadoopTableOperations relies on the return value of FileSystem.rename to check whether the commit eventually succeeded

there are two problems here:

(a) the above (1) and (2) are not atomic, so it is possible multiple threads have passed (1) and then entered to (2)
(b) FileSystem.rename is not sufficient to check whether the target file exists (really implementation-depended) , as a result multiple staging metadata file are renamed to the same one, e.g. multiple threads commit to v2.metadata.json eventually

Proposed Fix

we move the LockManagers class originally located in iceberg-aws to iceberg-core and add the capability for HadoopTables to leverage it for concurrency control in committing.

By default, HadoopTables will use InMemoryLockManager and be only able to handle multi threading appending. Users can pass in configurations with Hadoop Configuration or via HadoopCatalog when creating HadoopTables to switch to other implementation like DynamoDB based LockManager to handle distributed commits

@CodingCat CodingCat changed the title [WIP] allowing concurrent commits from multiple threads in HadoopTable [WIP] allowing concurrent appending from multiple threads in HadoopTable Dec 3, 2021
@CodingCat CodingCat changed the title [WIP] allowing concurrent appending from multiple threads in HadoopTable [WIP] fix data loss in multi-threading append with HadoopTable Dec 3, 2021
@CodingCat CodingCat changed the title [WIP] fix data loss in multi-threading append with HadoopTable fix data loss in multi-threading append with HadoopTable Dec 4, 2021
@CodingCat
Copy link
Copy Markdown
Contributor Author

cc: @kbendick, @jackye1995, @nastra, @aokolnychyi, @danielcweeks, @rdblue

would you mind giving a review? thanks!

HadoopTableOperations.releaseLock.lock();
if (fs.exists(dst)) {
throw new CommitFailedException(
"Version %d already exists: %s", nextVersion, dst);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be on a new line?

Copy link
Copy Markdown
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your interest @CodingCat!

I have some concerns about this PR in general, as we don't support concurrent writers with the Hadoop catalog. I understand this is just supposed to be still best effort and from within the same process, but I do worry this will expand out into further and further effort to emulate what one should be using a catalog with a metastore of some kind (be it JDBC, HMS, DynamoDB, etc).

If others don't mind that, I won't object further.

My other concern is the incredibly large number of threads spawned in some of the tests. I think in our busy CI environment that this is going to be a real problem for stability, if not making the tests somewhat flaky. Could we reduce the number of threads spawned in each test to 2, or even 4 or so? 30 is definitely way too many, and 10 is still quite a lot for one test. We do share the CI infrastructure with a large number of other projects in the Apache Foundation.

);

Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
int threadsCount = 10;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Same thing here about lowering the thread count.

put(TableProperties.COMMIT_NUM_RETRIES, "1000");
}
}, dir.toURI().toString());
int threadsCount = 30;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit / Somewhat blocking: Can we consider lowering this value? CI already takes some time, and I feel like spawning 30 threads in a test in our CI environment, where each of those threads runs a spark job, might be a little heavy given the very large number of concurrent users as well as the limited testing resources we get from github.

Would 4 or 5 threads be ok for all of the tests? Spawning 30 OS threads in one single test is likely to cause a large degree of contention.

What about if we changed threadsCount to 4 or 5? Would that still be ok?

Comment on lines +447 to +452
} catch (InterruptedException e) {
// intentionally swallow to check result later
}
});
tableWithHighRetries.refresh();
assertEquals(threadsCount, Lists.newArrayList(tableWithHighRetries.snapshots()).size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no danger here of having a potentially flakey test? I see that the commit number of retries is set to 1000.

But our CI environment can get very busy. I worry that having to set the retries to 1000 will cause the amount of time CI takes to increase by quite a bit. We do share test resources with the entirety of the ASF after all.

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Dec 5, 2021

@CodingCat, HadoopTableOperations is not safe to use with a file system that doesn't support atomic rename. Even if we were to add a lock as proposed in this PR, concurrent processes could still produce the same version in stores like S3 or even most local file systems.

If you want to avoid using a catalog implementation that provides an atomic rename (like JDBC, DynamoDB, or Hive) then you'd need to use a lock that can coordinate across processes.

For using Glue as a metastore, @jackye1995 added a DynamoDB based lock, in DynamoLockManager. I recommend adding the ability for HadoopTableOperations to use the DynamoDB lock instead of adding a JVM lock.

@CodingCat
Copy link
Copy Markdown
Contributor Author

Hi, @kbendick and @rdblue , thank you very much for the review! IIUC, besides the too-many thread number concern (which I will address with code), your comments are both related to the scope of HadoopTable. let me try to address them here and love to hear if it makes sense to you

First, this PR is exactly only for multi threading instead of processes. The reason we care about multi threading operation is that it is much more implicit than multiple processes. Users may easily realize there are 1+ Spark applications running in the cluster but not necessarily realized there are multiple threads appending to the same table from the same Spark application. Just like our case which is somewhat an event-driven append case and each event is processed by a Spark job triggered from a separate thread. So as @kbendick said, it is a best-effort work and as @rdblue said, the multiple processes limitation is still there.

Second, regarding @kbendick 's concern, how to prevent it extending to an endless effort to make HadoopTable reaching to the capability of other catalog implementations, i.e. make HadoopTable eventually support distributed commits . I think it is a valid concern on the resource allocation of community and roadmap of the project . My personal opinion is to try to find some way to explicitly fail the application (in this PR or the other, depends on the workload) when finding multiple processes are committing, not only serving as a hard guard but also prevent users didn't read docs and just to lose some data

Third, regarding @rdblue's suggestion about making HadoopTableOperations support distributed committing by leveraging DynamoLockManager . I am hesitated to go with this path for now , please do let me know if I misunderstood anything: essentially we are going to make iceberg-core depends on iceberg-aws which will bring the circular dependency and also some counter-intuitive architecture with a hardcode component from AWS...

Lastly, as I said in the background info of the PR, companies like us have been operating in some company's env for a while. There was a period that every customer was encouraged to write parquet path instead of registering a table in HMS and it has make 100s of our data applications are talking to file system directly instead of HMS. The effort here is also about making functionality parity with Delta Lake

Regarding the action items:

  1. I will definitely address thread number issue, the retry-limit like 1000 or thread count are more like conservative numbers, easy to lower down. in theory , retry-limit can be <= thread count
  2. if you all agree, I will try to find some way to fail applications explicitly when there are distributed commits in HadoopTable...I will share my findings and depends on the complexity, we can decide how to move forward

Love to hear your feedbacks!

Copy link
Copy Markdown
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be much in favor of using something like https://github.com/awaitility/awaitility to test async code as this would reduce the chance for flakiness. @rdblue thoughts?

}
Arrays.stream(threads).forEach(t -> {
try {
t.join();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unfortunately a recipe for flaky tests. It would be much nicer if we could use https://github.com/awaitility/awaitility and have something like Awaitility.await().atMost(X, SECONDS).untilAsserted(() -> assertThat(...).isEqualTo(..))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in general, but in this case there are a couple of easier solutions.

First, these tests already exist for other catalogs so we can adapt them to use HadoopCatalog. This should copy TestJdbcTableConcurrency` and update it. Those tests aren't flaky.

Second, we have a trusted way to run tasks in a thread pool, the Tasks class that is used all over Iceberg. Using that is a really easy way to run tests like this. It is what TestJdbcTableConcurrency uses.

If that doesn't work, then a library like Awaitility works for me.

.load(location.toString());

List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Number of rows should match", 3 * threadsCount, actual.size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment here about using Awaitility to assert async code

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Dec 6, 2021

@CodingCat, adding the ability to use a lock doesn't need to require a dependency on iceberg-aws. The lock manager was built as an API and multiple implementations, including an in-memory implementation. The interface and in-memory implementation could be moved into core and you would then dynamically load your preferred lock manager. That avoids the circular dependency and adds very similar functionality to what you're doing here out of the box.

Regarding the use case, I understand wanting to have no dependency other than the object store. I think the right way to do that is to coordinate writers, whether those are threads or processes. If you choose to go with the in-memory lock manager and risk data loss that's up to you -- to your point, you would have reached parity so you may not want full locking.

@jackye1995
Copy link
Copy Markdown
Contributor

jackye1995 commented Dec 9, 2021

This is actually interesting development. In the recent announcement related to AWS Athena support for Iceberg, we mentioned that the Glue catalog integration uses Glue native optimistic locking and no longer require any lock such as DynamoLockManager. So we are planning to deprecate the lock manager once the new Glue SDK with that feature is released in a few weeks.

We can move the LockManager interface and in-memory implementation to core if that is the place which needs this feature the most. If you want to use the DynamoLockManager, we can keep that in the iceberg-aws module for dynamic loading.

@github-actions github-actions Bot added the AWS label Dec 11, 2021
@CodingCat
Copy link
Copy Markdown
Contributor Author

@rdblue @jackye1995 @kbendick I have moved LockManagers to core and added the capability for HadoopTableOperations to leverage that. Specifically, users can pass in lock configuration via Hadoop Configuration (prefix with iceberg.) , please let me know if this is the approach you like. And also I reduced the number of threads in tests

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Dec 12, 2021

Looks like there are Javadoc/build problems, but tests are passing so we can review while you fix those in parallel.

this.conf = conf;
this.location = location;
this.fileIO = fileIO;
this.lockManager = lockMgr;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Iceberg, we avoid abbreviations like manager -> mgr because it hurts readability and isn't actually easier to type.

@github-actions github-actions Bot added the API label Dec 22, 2021
*/

package org.apache.iceberg.aws.glue;
package org.apache.iceberg;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995, couldn't this still be in the util package even if it is in the API module?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it can still be in the util package.

I think the actual module and package of this class depends on how we want to position this feature. I remember you mentioned we might want to eventually introduce some kind of pessimistic lock to fully lock a table to make certain operations succeed.

So the following makes sense to me:

  1. if it's only for HadoopTables (or maybe HadoopCatalog), then iceberg-core module can work and it can either be in org.apache.iceberg.hadoop or org.apache.iceberg.util.
  2. if we want this to do other things in the future like fully locking a table in a catalog, then I think iceberg-api module and org.apache.iceberg package seems more official.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that having this in the API module makes sense, but I would put LockManagers and the in-memory implementation in core. I'd also move the implementation back to the util package since we want to avoid making org.apache.iceberg any more huge.

public static final long CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5);

public static final String APP_ID = "app-id";
public static final String USER = "user";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you revert the changes to this file, please? This doesn't look like a necessary change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm...strange, seems unsynced from master branch


public static final String APP_ID = "app-id";
public static final String USER = "user";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this should be reverted as well.

Comment thread core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
private void renameToFinal(FileSystem fs, Path src, Path dst) {
private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) {
try {
lockManager.acquire(dst.toString(), src.toString());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future me: this looks good.

int threadsCount = 5;
Table tableWithHighRetries = TABLES.create(SCHEMA, SPEC, new HashMap<String, String>() {
{
put(TableProperties.COMMIT_NUM_RETRIES, String.valueOf(threadsCount));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of new HashMap, we prefer Maps.newHashMap in most cases. But here, you can use ImmutableMap to more succinctly create a map: ImmutableMap.of(COMMIT_NUM_RETRIES, String.valueOf(threadsCount)).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@Test
public void testConcurrentAppend() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I's fine to add more tests, but please copy the testConcurrentFastAppends test from JDBC into this suite. We have validated that it catches quite a few problems so we should reuse it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename the method to match the source?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, can you copy the original without modification other than to set up the catalog? It is too heavily modified from the original version.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually there are no significant changes, just making some constant in variable ....and changing the values of those constant,

the major reason to change to larger values is to make the issue easier to reproduce (it is hard to see the failure of committing with only 2 threads as set in JDBC test)

@Test
public void testConcurrentAppend() throws Exception {
assertTrue("Should create v1 metadata",
version(1).exists() && version(1).isFile());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this newline necessary?

Comment thread core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
Comment thread core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
public class HadoopTables implements Tables, Configurable {

public static final String LOCK_PROPERTY_PREFIX = "iceberg.tables.hadoop.";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this private and remove the newline? I don't think there's a reason for this prefix to be public.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually to address the comments at #3663 (comment) from @jackye1995

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's okay, then.

Comment thread core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
@rdblue rdblue changed the title fix data loss in concurrent appending with HadoopTable Core: Add LockManager to HadoopTableOperations Dec 30, 2021
@rdblue rdblue merged commit 8afcdff into apache:master Dec 30, 2021
@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Dec 30, 2021

Thanks, @CodingCat! I merged this. It should work as you intend now, although I still recommend using a real lock manager in production!


tableWithHighRetries.refresh();
assertEquals(threadsCount * numberOfCommitedFilesPerThread,
Lists.newArrayList(tableWithHighRetries.snapshots()).size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this locally without the lock manager additions and it failed as expected.

Copy link
Copy Markdown

@yihaizhu yihaizhu Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Ryan, I constantly run with this test failure locally as:
org.apache.iceberg.hadoop.TestHadoopCommits > testConcurrentFastAppends FAILED
java.lang.AssertionError: expected:<50> but was:<40>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:647)
at org.junit.Assert.assertEquals(Assert.java:633)
at org.apache.iceberg.hadoop.TestHadoopCommits.testConcurrentFastAppends(TestHadoopCommits.java:472)

1353 tests completed, 1 failed, 21 skipped

Is this expected?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I run it again and this time i get the actual value: 45. The actual value seems to change at every run.

org.apache.iceberg.hadoop.TestHadoopCommits > testConcurrentFastAppends FAILED
java.lang.AssertionError: expected:<50> but was:<45>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:647)
at org.junit.Assert.assertEquals(Assert.java:633)
at org.apache.iceberg.hadoop.TestHadoopCommits.testConcurrentFastAppends(TestHadoopCommits.java:472)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants