Core: Add LockManager to HadoopTableOperations#3663
Conversation
|
cc: @kbendick, @jackye1995, @nastra, @aokolnychyi, @danielcweeks, @rdblue would you mind giving a review? thanks! |
| HadoopTableOperations.releaseLock.lock(); | ||
| if (fs.exists(dst)) { | ||
| throw new CommitFailedException( | ||
| "Version %d already exists: %s", nextVersion, dst); |
There was a problem hiding this comment.
Does this need to be on a new line?
There was a problem hiding this comment.
Thanks for your interest @CodingCat!
I have some concerns about this PR in general, as we don't support concurrent writers with the Hadoop catalog. I understand this is just supposed to be still best effort and from within the same process, but I do worry this will expand out into further and further effort to emulate what one should be using a catalog with a metastore of some kind (be it JDBC, HMS, DynamoDB, etc).
If others don't mind that, I won't object further.
My other concern is the incredibly large number of threads spawned in some of the tests. I think in our busy CI environment that this is going to be a real problem for stability, if not making the tests somewhat flaky. Could we reduce the number of threads spawned in each test to 2, or even 4 or so? 30 is definitely way too many, and 10 is still quite a lot for one test. We do share the CI infrastructure with a large number of other projects in the Apache Foundation.
| ); | ||
|
|
||
| Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class); | ||
| int threadsCount = 10; |
There was a problem hiding this comment.
Nit: Same thing here about lowering the thread count.
| put(TableProperties.COMMIT_NUM_RETRIES, "1000"); | ||
| } | ||
| }, dir.toURI().toString()); | ||
| int threadsCount = 30; |
There was a problem hiding this comment.
Nit / Somewhat blocking: Can we consider lowering this value? CI already takes some time, and I feel like spawning 30 threads in a test in our CI environment, where each of those threads runs a spark job, might be a little heavy given the very large number of concurrent users as well as the limited testing resources we get from github.
Would 4 or 5 threads be ok for all of the tests? Spawning 30 OS threads in one single test is likely to cause a large degree of contention.
What about if we changed threadsCount to 4 or 5? Would that still be ok?
| } catch (InterruptedException e) { | ||
| // intentionally swallow to check result later | ||
| } | ||
| }); | ||
| tableWithHighRetries.refresh(); | ||
| assertEquals(threadsCount, Lists.newArrayList(tableWithHighRetries.snapshots()).size()); |
There was a problem hiding this comment.
Is there no danger here of having a potentially flakey test? I see that the commit number of retries is set to 1000.
But our CI environment can get very busy. I worry that having to set the retries to 1000 will cause the amount of time CI takes to increase by quite a bit. We do share test resources with the entirety of the ASF after all.
|
@CodingCat, If you want to avoid using a catalog implementation that provides an atomic rename (like JDBC, DynamoDB, or Hive) then you'd need to use a lock that can coordinate across processes. For using Glue as a metastore, @jackye1995 added a DynamoDB based lock, in |
|
Hi, @kbendick and @rdblue , thank you very much for the review! IIUC, besides the too-many thread number concern (which I will address with code), your comments are both related to the scope of HadoopTable. let me try to address them here and love to hear if it makes sense to you First, this PR is exactly only for multi threading instead of processes. The reason we care about multi threading operation is that it is much more implicit than multiple processes. Users may easily realize there are 1+ Spark applications running in the cluster but not necessarily realized there are multiple threads appending to the same table from the same Spark application. Just like our case which is somewhat an event-driven append case and each event is processed by a Spark job triggered from a separate thread. So as @kbendick said, it is a best-effort work and as @rdblue said, the multiple processes limitation is still there. Second, regarding @kbendick 's concern, how to prevent it extending to an endless effort to make HadoopTable reaching to the capability of other catalog implementations, i.e. make HadoopTable eventually support distributed commits . I think it is a valid concern on the resource allocation of community and roadmap of the project . My personal opinion is to try to find some way to explicitly fail the application (in this PR or the other, depends on the workload) when finding multiple processes are committing, not only serving as a hard guard but also prevent users didn't read docs and just to lose some data Third, regarding @rdblue's suggestion about making HadoopTableOperations support distributed committing by leveraging DynamoLockManager . I am hesitated to go with this path for now , please do let me know if I misunderstood anything: essentially we are going to make iceberg-core depends on iceberg-aws which will bring the circular dependency and also some counter-intuitive architecture with a hardcode component from AWS... Lastly, as I said in the background info of the PR, companies like us have been operating in some company's env for a while. There was a period that every customer was encouraged to write parquet path instead of registering a table in HMS and it has make 100s of our data applications are talking to file system directly instead of HMS. The effort here is also about making functionality parity with Delta Lake Regarding the action items:
Love to hear your feedbacks! |
nastra
left a comment
There was a problem hiding this comment.
I would be much in favor of using something like https://github.com/awaitility/awaitility to test async code as this would reduce the chance for flakiness. @rdblue thoughts?
| } | ||
| Arrays.stream(threads).forEach(t -> { | ||
| try { | ||
| t.join(); |
There was a problem hiding this comment.
this is unfortunately a recipe for flaky tests. It would be much nicer if we could use https://github.com/awaitility/awaitility and have something like Awaitility.await().atMost(X, SECONDS).untilAsserted(() -> assertThat(...).isEqualTo(..))
There was a problem hiding this comment.
I agree in general, but in this case there are a couple of easier solutions.
First, these tests already exist for other catalogs so we can adapt them to use HadoopCatalog. This should copy TestJdbcTableConcurrency` and update it. Those tests aren't flaky.
Second, we have a trusted way to run tasks in a thread pool, the Tasks class that is used all over Iceberg. Using that is a really easy way to run tests like this. It is what TestJdbcTableConcurrency uses.
If that doesn't work, then a library like Awaitility works for me.
| .load(location.toString()); | ||
|
|
||
| List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); | ||
| Assert.assertEquals("Number of rows should match", 3 * threadsCount, actual.size()); |
There was a problem hiding this comment.
same comment here about using Awaitility to assert async code
|
@CodingCat, adding the ability to use a lock doesn't need to require a dependency on iceberg-aws. The lock manager was built as an API and multiple implementations, including an in-memory implementation. The interface and in-memory implementation could be moved into core and you would then dynamically load your preferred lock manager. That avoids the circular dependency and adds very similar functionality to what you're doing here out of the box. Regarding the use case, I understand wanting to have no dependency other than the object store. I think the right way to do that is to coordinate writers, whether those are threads or processes. If you choose to go with the in-memory lock manager and risk data loss that's up to you -- to your point, you would have reached parity so you may not want full locking. |
|
This is actually interesting development. In the recent announcement related to AWS Athena support for Iceberg, we mentioned that the Glue catalog integration uses Glue native optimistic locking and no longer require any lock such as DynamoLockManager. So we are planning to deprecate the lock manager once the new Glue SDK with that feature is released in a few weeks. We can move the LockManager interface and in-memory implementation to core if that is the place which needs this feature the most. If you want to use the DynamoLockManager, we can keep that in the |
|
@rdblue @jackye1995 @kbendick I have moved LockManagers to core and added the capability for HadoopTableOperations to leverage that. Specifically, users can pass in lock configuration via Hadoop Configuration (prefix with |
|
Looks like there are Javadoc/build problems, but tests are passing so we can review while you fix those in parallel. |
| this.conf = conf; | ||
| this.location = location; | ||
| this.fileIO = fileIO; | ||
| this.lockManager = lockMgr; |
There was a problem hiding this comment.
In Iceberg, we avoid abbreviations like manager -> mgr because it hurts readability and isn't actually easier to type.
| */ | ||
|
|
||
| package org.apache.iceberg.aws.glue; | ||
| package org.apache.iceberg; |
There was a problem hiding this comment.
@jackye1995, couldn't this still be in the util package even if it is in the API module?
There was a problem hiding this comment.
yes it can still be in the util package.
I think the actual module and package of this class depends on how we want to position this feature. I remember you mentioned we might want to eventually introduce some kind of pessimistic lock to fully lock a table to make certain operations succeed.
So the following makes sense to me:
- if it's only for
HadoopTables(or maybeHadoopCatalog), theniceberg-coremodule can work and it can either be inorg.apache.iceberg.hadoopororg.apache.iceberg.util. - if we want this to do other things in the future like fully locking a table in a catalog, then I think
iceberg-apimodule andorg.apache.icebergpackage seems more official.
There was a problem hiding this comment.
I think that having this in the API module makes sense, but I would put LockManagers and the in-memory implementation in core. I'd also move the implementation back to the util package since we want to avoid making org.apache.iceberg any more huge.
| public static final long CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5); | ||
|
|
||
| public static final String APP_ID = "app-id"; | ||
| public static final String USER = "user"; |
There was a problem hiding this comment.
Can you revert the changes to this file, please? This doesn't look like a necessary change.
There was a problem hiding this comment.
hmmm...strange, seems unsynced from master branch
|
|
||
| public static final String APP_ID = "app-id"; | ||
| public static final String USER = "user"; | ||
|
|
There was a problem hiding this comment.
Looks like this should be reverted as well.
| private void renameToFinal(FileSystem fs, Path src, Path dst) { | ||
| private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { | ||
| try { | ||
| lockManager.acquire(dst.toString(), src.toString()); |
There was a problem hiding this comment.
For future me: this looks good.
| int threadsCount = 5; | ||
| Table tableWithHighRetries = TABLES.create(SCHEMA, SPEC, new HashMap<String, String>() { | ||
| { | ||
| put(TableProperties.COMMIT_NUM_RETRIES, String.valueOf(threadsCount)); |
There was a problem hiding this comment.
Instead of new HashMap, we prefer Maps.newHashMap in most cases. But here, you can use ImmutableMap to more succinctly create a map: ImmutableMap.of(COMMIT_NUM_RETRIES, String.valueOf(threadsCount)).
| } | ||
|
|
||
| @Test | ||
| public void testConcurrentAppend() throws Exception { |
There was a problem hiding this comment.
I's fine to add more tests, but please copy the testConcurrentFastAppends test from JDBC into this suite. We have validated that it catches quite a few problems so we should reuse it.
There was a problem hiding this comment.
Can you rename the method to match the source?
There was a problem hiding this comment.
Actually, can you copy the original without modification other than to set up the catalog? It is too heavily modified from the original version.
There was a problem hiding this comment.
actually there are no significant changes, just making some constant in variable ....and changing the values of those constant,
the major reason to change to larger values is to make the issue easier to reproduce (it is hard to see the failure of committing with only 2 threads as set in JDBC test)
| @Test | ||
| public void testConcurrentAppend() throws Exception { | ||
| assertTrue("Should create v1 metadata", | ||
| version(1).exists() && version(1).isFile()); |
| public class HadoopTables implements Tables, Configurable { | ||
|
|
||
| public static final String LOCK_PROPERTY_PREFIX = "iceberg.tables.hadoop."; | ||
|
|
There was a problem hiding this comment.
Can you make this private and remove the newline? I don't think there's a reason for this prefix to be public.
There was a problem hiding this comment.
this is actually to address the comments at #3663 (comment) from @jackye1995
|
Thanks, @CodingCat! I merged this. It should work as you intend now, although I still recommend using a real lock manager in production! |
|
|
||
| tableWithHighRetries.refresh(); | ||
| assertEquals(threadsCount * numberOfCommitedFilesPerThread, | ||
| Lists.newArrayList(tableWithHighRetries.snapshots()).size()); |
There was a problem hiding this comment.
I ran this locally without the lock manager additions and it failed as expected.
There was a problem hiding this comment.
Hey Ryan, I constantly run with this test failure locally as:
org.apache.iceberg.hadoop.TestHadoopCommits > testConcurrentFastAppends FAILED
java.lang.AssertionError: expected:<50> but was:<40>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:647)
at org.junit.Assert.assertEquals(Assert.java:633)
at org.apache.iceberg.hadoop.TestHadoopCommits.testConcurrentFastAppends(TestHadoopCommits.java:472)
1353 tests completed, 1 failed, 21 skipped
Is this expected?
There was a problem hiding this comment.
I run it again and this time i get the actual value: 45. The actual value seems to change at every run.
org.apache.iceberg.hadoop.TestHadoopCommits > testConcurrentFastAppends FAILED
java.lang.AssertionError: expected:<50> but was:<45>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:647)
at org.junit.Assert.assertEquals(Assert.java:633)
at org.apache.iceberg.hadoop.TestHadoopCommits.testConcurrentFastAppends(TestHadoopCommits.java:472)
this PR address the data loss issue when there are multiple threads appending to HadoopTable within the same process.
What we observed
when we work on the migration from Delta Lake to Iceberg, we found one of the test cases where we concurrently append data to the table with Spark failed with some data loss and/or writer job abort.
Specifically, we found when there are , e.g. 10 threads append at the same time, we eventually only get any number of versions of table between [1, 10] (with some instrumentation, I saw that multiple threads eventually committed the same version )
Root Cause Analysis
with some analysis, we found the root cause is on the current mechanism to check version conflict. Currently
(1) HadoopTableOperations checks the existence of target metadata file path , if exists , then throw CommitFailedException
(2) if (1) passed, HadoopTableOperations relies on the return value of FileSystem.rename to check whether the commit eventually succeeded
there are two problems here:
(a) the above (1) and (2) are not atomic, so it is possible multiple threads have passed (1) and then entered to (2)
(b) FileSystem.rename is not sufficient to check whether the target file exists (really implementation-depended) , as a result multiple staging metadata file are renamed to the same one, e.g. multiple threads commit to v2.metadata.json eventually
Proposed Fix
we move the LockManagers class originally located in iceberg-aws to iceberg-core and add the capability for HadoopTables to leverage it for concurrency control in committing.
By default, HadoopTables will use InMemoryLockManager and be only able to handle multi threading appending. Users can pass in configurations with Hadoop Configuration or via HadoopCatalog when creating HadoopTables to switch to other implementation like DynamoDB based LockManager to handle distributed commits