Skip to content

[#232] Fix partition number for stored id mapping for better performance #236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

artem-aliev
Copy link

repartition() method by default create 200 or fixed spark.sql.shuffle.partitions partitions
that causes performance problems for small graphs and OOM problems for large ones.
to fix both problem let's make id mapping DF have the same number of partitions that original vertex DF.

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you rebase to pick up the fix for tests?
sounds like a good idea - do you know if there is a significant performance differences?

@@ -476,7 +476,7 @@ class GraphFrame private(
col(ATTR + "." + ID).cast("long").as(LONG_ID), col(ATTR + "." + ID).as(ID), col(ATTR))
} else {
val withLongIds = vertices.select(ID)
.repartition(col(ID))
.repartition(vertices.rdd.getNumPartitions,col(ID))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a space after ,

@felixcheung
Copy link
Member

actually looks like a different problem with build, let me investigate

@felixcheung
Copy link
Member

I've merged in the fix, please rebase

…er performance

    repartition() method by default create 200 or fixed spark.sql.shuffle.partitions partitions
    that causes performance problems for small graphs and OOM problems for large ones.
    to fix both problem let's make id mapping DF have the same number of partitions that original vertex DF.
    I also found that join still try to repartiton result to 200 partiotions for sofisitcated DFs s I make repartition for both part of joins.
.distinct()
.sortWithinPartitions(ID)
.withColumn(LONG_ID, monotonically_increasing_id())
.persist(StorageLevel.MEMORY_AND_DISK)
vertices.select(col(ID), nestAsCol(vertices, ATTR))
repartitionedVertices.select(col(ID), nestAsCol(repartitionedVertices, ATTR))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played with Cassandra data source and with hdfs bas dataset, and it looks like this join repartition back to 200 partitions the resulted DF. so I explicitly repartition both parts of the join tot the same number of partitions.
I use the "ID" for partitioning both sides, so join should not reparation both of them.

…rsonalizedPageRank test.

So the test update is in this separate ticket
@artem-aliev
Copy link
Author

I'm sorry for delay with answers. Here is a better fix. It also fix #235 issue.
the performance test results are the same and strongly depends on number of partitions in the provided graph.

def time[A](f: => A) = {val s = System.nanoTime; val ret = f; println("time: "+(System.nanoTime-s)/1e6+"ms"); ret }

import org.graphframes._
val g = examples.Graphs.friends
val results2 = time { g.pageRank.resetProbability(0.15).maxIter(10).run() }

it is now: 72273ms
with fix: 9996ms

if i coalesce the graph I got even better result with the fix:

val g2 = GraphFrame (g.vertices.coalesce(1), g.edges.coalesce(1))
val results2 = time { g.pageRank.resetProbability(0.15).maxIter(10).run() }

7996ms

@codecov-io
Copy link

codecov-io commented Sep 21, 2017

Codecov Report

Merging #236 into master will increase coverage by 0.01%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #236      +/-   ##
=========================================
+ Coverage   89.18%   89.2%   +0.01%     
=========================================
  Files          20      20              
  Lines         740     741       +1     
  Branches       40      43       +3     
=========================================
+ Hits          660     661       +1     
  Misses         80      80
Impacted Files Coverage Δ
src/main/scala/org/graphframes/GraphFrame.scala 86.63% <100%> (+0.06%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c622de6...b3be789. Read the comment docs.

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool - thanks, that fixes personalized page rank.
Could you think of a more direct test for the change to indexedVertices?

@artem-aliev
Copy link
Author

Do you mean add more unit tests? Give me some time, I'll try to create some...

@felixcheung
Copy link
Member

yes, to check that changes to indexedVertices hasn't broken anything

…ithIndex. add test

DF monotonically_increasing_id() method assign id that are greater them max int,
It has paritition id in higher 32 bits.
Some GraphX algorithm require id to be less then max int so monotonically_increasing_id() can not be used.
RDD has zipWithIndex method that provide proper indexing, DF has no one, so we convert it to rdd add index and back
The test thta check correctnes of id assignment is aslo added. It aslo checks that id < Int.maxValue
@artem-aliev
Copy link
Author

Sorry for the long delay. While I doing other thing I realised that
DF monotonically_increasing_id() method assign id that are greater them max int,
It has paritition id in higher 32 bits, so we got #235 issue. The algorithm require id to be less then max int.
RDD has zipWithIndex method that provide proper indexing, DF has no one, so I convert DF to rdd add index and convert back. Performance is still as good as it was as most time is spend in graph algorithm iterations.
The test that check correctnes of id assignment is aslo added. It aslo checks that id < Int.maxValue
Is it ok or I did not know other pit falls?

val repartitionedVertices = vertices.repartition(vertices.rdd.getNumPartitions, col(ID))
val idDF = repartitionedVertices.select(ID)
val idType = idDF.schema(0)
// DF monotonically_increasing_id() method assign id that are greater them max int,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artem-aliev
zipWithIndex generates a Long index isn't it?
For large graphs (>Max.INT vertices) you want to have unique ids.
Can you elaborate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this limitation is still exists. It is GraphX one. But I do not expect any of graph algorithms will finish in reasonable time with >Max.INT vertices.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well - disagree, we have regular graphs with 10s of Billions of vertices.
Adding this limitation of 2,147,483,647 vertices will just limit the graphframes usage

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants