-
Notifications
You must be signed in to change notification settings - Fork 250
[#232] Fix partition number for stored id mapping for better performance #236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you rebase to pick up the fix for tests?
sounds like a good idea - do you know if there is a significant performance differences?
@@ -476,7 +476,7 @@ class GraphFrame private( | |||
col(ATTR + "." + ID).cast("long").as(LONG_ID), col(ATTR + "." + ID).as(ID), col(ATTR)) | |||
} else { | |||
val withLongIds = vertices.select(ID) | |||
.repartition(col(ID)) | |||
.repartition(vertices.rdd.getNumPartitions,col(ID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a space after ,
actually looks like a different problem with build, let me investigate |
I've merged in the fix, please rebase |
…er performance repartition() method by default create 200 or fixed spark.sql.shuffle.partitions partitions that causes performance problems for small graphs and OOM problems for large ones. to fix both problem let's make id mapping DF have the same number of partitions that original vertex DF. I also found that join still try to repartiton result to 200 partiotions for sofisitcated DFs s I make repartition for both part of joins.
3809088
to
fcb9183
Compare
.distinct() | ||
.sortWithinPartitions(ID) | ||
.withColumn(LONG_ID, monotonically_increasing_id()) | ||
.persist(StorageLevel.MEMORY_AND_DISK) | ||
vertices.select(col(ID), nestAsCol(vertices, ATTR)) | ||
repartitionedVertices.select(col(ID), nestAsCol(repartitionedVertices, ATTR)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played with Cassandra data source and with hdfs bas dataset, and it looks like this join repartition back to 200 partitions the resulted DF. so I explicitly repartition both parts of the join tot the same number of partitions.
I use the "ID" for partitioning both sides, so join should not reparation both of them.
…rsonalizedPageRank test. So the test update is in this separate ticket
I'm sorry for delay with answers. Here is a better fix. It also fix #235 issue.
it is now: 72273ms if i coalesce the graph I got even better result with the fix:
7996ms |
Codecov Report
@@ Coverage Diff @@
## master #236 +/- ##
=========================================
+ Coverage 89.18% 89.2% +0.01%
=========================================
Files 20 20
Lines 740 741 +1
Branches 40 43 +3
=========================================
+ Hits 660 661 +1
Misses 80 80
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool - thanks, that fixes personalized page rank.
Could you think of a more direct test for the change to indexedVertices?
Do you mean add more unit tests? Give me some time, I'll try to create some... |
yes, to check that changes to indexedVertices hasn't broken anything |
…ithIndex. add test DF monotonically_increasing_id() method assign id that are greater them max int, It has paritition id in higher 32 bits. Some GraphX algorithm require id to be less then max int so monotonically_increasing_id() can not be used. RDD has zipWithIndex method that provide proper indexing, DF has no one, so we convert it to rdd add index and back The test thta check correctnes of id assignment is aslo added. It aslo checks that id < Int.maxValue
Sorry for the long delay. While I doing other thing I realised that |
val repartitionedVertices = vertices.repartition(vertices.rdd.getNumPartitions, col(ID)) | ||
val idDF = repartitionedVertices.select(ID) | ||
val idType = idDF.schema(0) | ||
// DF monotonically_increasing_id() method assign id that are greater them max int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artem-aliev
zipWithIndex generates a Long index isn't it?
For large graphs (>Max.INT vertices) you want to have unique ids.
Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, this limitation is still exists. It is GraphX one. But I do not expect any of graph algorithms will finish in reasonable time with >Max.INT vertices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well - disagree, we have regular graphs with 10s of Billions of vertices.
Adding this limitation of 2,147,483,647 vertices will just limit the graphframes usage
repartition() method by default create 200 or fixed spark.sql.shuffle.partitions partitions
that causes performance problems for small graphs and OOM problems for large ones.
to fix both problem let's make id mapping DF have the same number of partitions that original vertex DF.