Skip to content

[178] AggregateMessages: multiple message and aggregation columns #186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

estebandonato
Copy link

@estebandonato estebandonato commented Apr 24, 2017

This PR is to address issue #178 There are 2 changes in AggregationMessages class:

  1. You can call sendToSrc & sendToDst methods with multiple columns or sql expressions if you need to include multiple vertex and/or edge columns in the messages sent to the vertices
  2. You can call agg method with multiple aggregation functions if you need different aggregations over the messages

Regarding 1. sendToSrc & sendToDst methods now accept varargs of org.apache.spark.sql.Column or String instances and when the message is created all these columns are grouped into a struct type. If both sendToSrc & sendToDst methods are called with multiple columns, both structs (number of columns, column names and types) have to be equal

Regarding 2. multiple aggregation functions are executed over the Dataframe that represents the messages. The resulting Dataframe contains the vertexId + one column per aggregation function passed to agg method with the result of each aggregation.

…thods as well as multiple aggregation functions
@felixcheung
Copy link
Member

felixcheung commented Apr 26, 2017

could you rebase this please?
and also, could you check the CI test failure?

@felixcheung felixcheung changed the title AggregateMessages: multiple message and aggregation columns [178] AggregateMessages: multiple message and aggregation columns Apr 26, 2017
@estebandonato
Copy link
Author

sure, will work on that.
Regarding the test failures, it seems my change is not compatible with spark.version=1.6.3 & scala.version=2.10.6.

Will check both and let you know.

@estebandonato
Copy link
Author

code rebased and test failures caused by incompatibility with Spark v1.6.3 fixed. Now what it is failing are the python tests because of the new aggregateMessages API in python recently added. Working on that.

@@ -239,19 +240,27 @@ def aggregateMessages(self, aggCol, sendToSrc=None, sendToDst=None):
if sendToSrc is not None:
if isinstance(sendToSrc, Column):
builder.sendToSrc(sendToSrc._jc)
elif isinstance(msgToSrc, list):
for col in msgToSrc:
builder.sendToSrc(col._jc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably check each of this col is a Column?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absolutely, this is still wip

case columns => df.select(
df(ID),
struct(columns.sorted.map(c => col(s"`${c}`").as(removeStructName(c))) :_*)
.as(AggregateMessages.MSG_COL_NAME))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain and comment why this and associated helper functions are necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the way AggregationMessages works: starts with a DataFrame resulting from calling GraphFrame.triplets, which generates a DF with columns: "src", "dst", "edge", each of which is of type struct with one struct element for each original attribute.

When 1 attribute per AggregateMessages was supported, this attribute was used in the projection of a query to this df. So let's say I have the following graph:

GraphFrame(v:[id: string, name: string, age: int], e:[src: string, dst: string, relationship: string])

If I wanted to send vertex attribute "name" from src to dst, that would generate the following query:

select dst['id'] AS id, src['name'] AS MSG from df.triplets

in the inverse direction

select src['id'] AS id, dst['name'] AS MSG from df.triplets

and in both directions:

select dst['id'] AS id, src['name'] AS MSG from df.triplets UNION select src['id'] AS id, dst['name'] AS MSG from df.triplets

Now, with the changes I introduced, multiples attribute can be sent in a message. The way it is resolved is to group all the attributes in a struct type. So following the same example, if I wanted to send both attributes "name" and "age" from src to dst the new query would be:

select dst['id'] AS id, struct(src['name'], src['age']) AS MSG from df.triplets

and similar for the other directions.

So this solution brings some challenges:

When I try to create a struct from existing struct elements the new generated df creates a struct naming each element col1, col2.. colN rather that the original name. With the example above try the following:

g.triplets.select(struct(expr("src['id']")).as("MSG")).printSchema
g.triplets.select(struct(col("src")("id")).as("MSG")).printSchema

and you will see what I'm saying. So what basically line 133 does is to create the struct but maintaining the original attribute names.

Additionally what removeStructName and the other helper functions do is the following: If I kept the original name as I said above, then I would end up with the following struct:

struct("src.name", "src.age") as MSG

that represents the message sent from src to dst. However, the message in the opposite direction would be:

struct("dst.name", "dst.age") as MSG

Since both structs have different element names, the union between both df doesn't work. So the helper method removes the struct name from the columns to end up having the following struct for both directions:

struct("name", "age") as MSG

This way the union is possible.

Last but not least, what I found is that while in Spark 1.6 the default column name set in a DF when you select a struct element is struct_name[struct_element], i.e. src[name] in Spark 2.x it is struct_name.struct_element, i.e. src.name. That's why you will see the functions removeStructNameForSpark2 and removeStructNameForSpark16

Hope this clarifies your doubts. Please let me know if you have further questions

@estebandonato
Copy link
Author

One check failed due to an OutOfMemory error while running test_connected_components_friends (graphframes.tests.GraphFrameLibTest) unit test which has nothing to do with the changes in this PR. Has this ever happened before? Do we have to increase the heap assigned to the jobs in Travis?

@felixcheung
Copy link
Member

hmm, not sure. if you see PR # 195 which is a recent PR related to connected component, tests seem to be passing

@codecov-io
Copy link

codecov-io commented May 21, 2017

Codecov Report

Merging #186 into master will not change coverage.
The diff coverage is 100%.

Impacted file tree graph

@@           Coverage Diff           @@
##           master     #186   +/-   ##
=======================================
  Coverage   89.18%   89.18%           
=======================================
  Files          20       20           
  Lines         740      740           
  Branches       40       39    -1     
=======================================
  Hits          660      660           
  Misses         80       80
Impacted Files Coverage Δ
.../scala/org/graphframes/lib/AggregateMessages.scala 93.54% <100%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7948381...dc6b0df. Read the comment docs.

@estebandonato
Copy link
Author

@felixcheung I think PR #195 is about fixing the non-deterministic ID assignment when a GraphFrame is converted to a GraphX and the vertexId is not convertible to a Long type. So even though the bug was detected in the connected component module, the bug affects every module/class or piece of code that calls GraphFrame.toGraphX method, i.e. connected component, lpa, pagerank, etc. Actually the fix doesn't involve changes in the connected component class. I know because I've been following this PR very close because I experienced the same bug using lpa and proprietary algorithm implemented with Pregel.

Anyway, I just pushed some minor changes (docs, validations, etc) and now all checks have passed.

So this is a first version of this new feature. I'm planning to keep working on resolving the first limitation I listed above: "if both sendToSrc & sendToDst methods are called, then you have to call both methods with the same columns"

In the meantime if you can review this change and provide feedback, I would really appreciate it.

@felixcheung
Copy link
Member

@estebandonato thanks for adding the details! I'm hoping to get back and review in a week or two. Sorry about the delay.

@estebandonato
Copy link
Author

@felixcheung did you have the chance to review this pr? In the meantime, since spark 1.6 is no longer supported I will remove some helper functions to simplify the code.

@felixcheung
Copy link
Member

sorry for dropping this - please update - I should have some time this week to review. thanks!

@felixcheung
Copy link
Member

would you like to follow up?

@estebandonato
Copy link
Author

yes, I'll be finishing this PR this week removing support for spark 1.6. Besides this change, the feature is completed. Sorry for the delay

@estebandonato
Copy link
Author

@felixcheung finally I'm done with this feature. In the last commit I removed support for spark 1.6 and also performed some code refactoring which makes the code much simpler to follow. I also changed the api, so now if you want to send multiple columns in a message, rather than having to call sendToSrc or sendToDst multiple times you call it once since both methods accept a varargs of org.apache.spark.sql.Column or String for the case of sql expressions. Also as part of this refactoring I solved one of the limitations detailed in my first msg : "you cannot call sendToSrc or sendToDst with vertex and edge columns with the same name. Otherwise there will be a column name collision."

Please review the solution and let me know your feedback.

@felixcheung
Copy link
Member

ah, thanks, I really like this newer approach, it seems to make a much cleaner solution.
Could you please the PR description here #186 (comment) to reflect the latest?

I'll review this in more details shortly.

@estebandonato
Copy link
Author

@felixcheung done! I updated #186 comment with the latest.

Let me know your feedback when you review this.

@SidWeng
Copy link

SidWeng commented Jul 3, 2018

really need this feature! what's the status now?

@estebandonato
Copy link
Author

@SidWeng this feature is pending to be reviewed by @felixcheung
@felixcheung when do you think you can see this?

@tchow-notion
Copy link

@estebandonato hi, any updates on this feature?

@estebandonato
Copy link
Author

@tchow-notion no updates unfortunately. Apparently it was not prioritized for revision and now the PR is stale. Except somebody has a different opinion, we should cancel this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants