Skip to content

Make edgeStorageLevel and vertexStorageLevel configurable #225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

estebandonato
Copy link

This PR addresses issue #219 The properties intermediateVertexStorageLevel and intermediateEdgeStorageLevel with their respective getters and setters were added to the classes that represent graph algorithms with a GraphX implementation. These 2 properties are used to change the storage level of the GraphX instance referenced by GraphFrame.cachedTopologyGraphX before calling the GraphX algorithm implementation.

In the special case of ConnectedComponent class, the properties were named graphxVertexStorageLevel and graphxEdgeStorageLevel to differentiate them with the property intermediateStorageLevel used in the graphframe implementation of this algorithm. Also the Python api of ConnectedComponent was modified to include a similar feature implemented in #213 only to the Scala api

These changes were applied to both the Scala and Python apis

@codecov-io
Copy link

codecov-io commented Aug 4, 2017

Codecov Report

Merging #225 into master will increase coverage by 0.72%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #225      +/-   ##
==========================================
+ Coverage   89.18%   89.91%   +0.72%     
==========================================
  Files          20       20              
  Lines         740      793      +53     
  Branches       40       44       +4     
==========================================
+ Hits          660      713      +53     
  Misses         80       80
Impacted Files Coverage Δ
...n/scala/org/graphframes/lib/LabelPropagation.scala 100% <100%> (ø) ⬆️
...cala/org/graphframes/lib/ConnectedComponents.scala 95.37% <100%> (+0.42%) ⬆️
.../graphframes/lib/StronglyConnectedComponents.scala 100% <100%> (ø) ⬆️
src/main/scala/org/graphframes/GraphFrame.scala 86.95% <100%> (+0.38%) ⬆️
...graphframes/lib/ParallelPersonalizedPageRank.scala 100% <100%> (ø) ⬆️
...main/scala/org/graphframes/lib/ShortestPaths.scala 96.66% <100%> (+1.01%) ⬆️
src/main/scala/org/graphframes/lib/PageRank.scala 96.55% <100%> (+1.31%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7948381...903b948. Read the comment docs.

@estebandonato
Copy link
Author

oraclejdk7 is no longer available in Trusty, so I modified travis configuration to use openjdk7. More details travis-ci/travis-ci#7964

I'm done with this PR and waiting for your feedback

@felixcheung
Copy link
Member

felixcheung commented Sep 23, 2017

instead of having these in each GF methods, I wonder if this should be a property of the "Graph" or "GraphFrame", for the "Edges" and "Vertices"?

@thunterdb @mengxr

.travis.yml Outdated
@@ -1,5 +1,5 @@
jdk:
- oraclejdk7 # openJDK crashes sometimes
- openjdk7 # openJDK crashes sometimes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rebase to pick up the latest?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

@estebandonato
Copy link
Author

@felixcheung actually I thought implementing this feature the way you describe, however I followed the current approach mainly to be aligned with the changes in api implemented in #213 and to allow different storage level settings for different algorithm executions. On the other hand, if we implement vertex and edge storage levels as GraphFrame properties we would be aligned with the approach used in GraphX. Both options have their pros and cons. Just let me know what you guys think and I can change the code if we decide to go with the properties approach.

@felixcheung
Copy link
Member

felixcheung commented Sep 28, 2017

right, I'm just worry the impact of having to have 2 parameters for every single one graph algo we have.
let's open this up and see anyone has any preference on the approach?

@mengxr
Copy link
Contributor

mengxr commented Sep 28, 2017

@estebandonato Two comments:

  1. Why do you need to control the storage levels for vertices/edges and in GraphX? I'm curious about the scenarios.

  2. To prevent adding too many parameters to APIs, we can introduce a new class and predefined constants:

class GraphStorageLevel(vertexStorageLevel, edgeStorageLevel)

object GraphStorageLevel {
  val MEMORY_ONLY = ...
  val ...
}

Then in each method we only need one arg to describe the storage levels. We can overload the current method to take this type. But this is only needed if 1) can be justified.

@estebandonato
Copy link
Author

estebandonato commented Oct 2, 2017

@mengxr regarding your question 1) we need to control storage levels in GraphX mainly because most of the GraphFrame's graph algorithms are just wrappers of the GraphX implementations. These are the cases of PageRank, LabelPropagation, StronglyConnectedComponents, just to list some of them. These are all iterative algorithms which extensively cache vertices and edges on each iteration to avoid re-computation. Concretely, that's the case of Pregel implementation in GraphX https://github.com/apache/spark/blob/v2.1.1/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala#L112. Storage levels used when caching a GraphX instance are defined in the Graph.apply method as it is shown in the Spark code below

def apply[VD: ClassTag, ED: ClassTag](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null.asInstanceOf[VD],
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

Currently, when GraphFrame has to call a GraphX algorithm, it first converts a GraphFrame instance to a GraphX instance but there is no way to specify vertex & edge storage levels when creating the GraphX instance. As consequence the default MEMORY_ONLY storage level is used to cache the graph on each iteration. In same environments of limited memory, parts that cannot fit in memory are not cached and so they are recomputed, and the cost of recomputing parts gets worse on each new iteration. Specifically what we have experienced, for instance with PageRank is that performance degraded on each iteration. We have also seen that the amount of shuffle reads for the same tasks increased on each iteration. As workaround we had to copy the wrapper code implemented in GraphFrame in our code but changing the way a GraphFrame instance is converted to a GraphX instance so we can set the storage levels (we set to MEMORY_AND_DISK). With that workaround the response times and shuffle reads stopped degrading on each iteration. Hope this justify the change. Eventually if there are plans to migrate the graphX implementation to a custom GraphFrame implementation (like the case of connected component) we could re-use these parameters for the new implementation.

Regarding 2) if we want to prevent adding too many parameters to the API, another alternative is just to have 1 storage level value per algorithm and apply it to both vertices and edges

Let me know your feedback to make the changes accordingly.

@estebandonato
Copy link
Author

@mengxr did you have the chance to read the reasons of this change explained above? Please let me know your thoughts

@estebandonato
Copy link
Author

@mengxr any update on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants