-
Notifications
You must be signed in to change notification settings - Fork 250
[178] AggregateMessages: multiple message and aggregation columns #186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[178] AggregateMessages: multiple message and aggregation columns #186
Conversation
…thods as well as multiple aggregation functions
could you rebase this please? |
sure, will work on that. Will check both and let you know. |
code rebased and test failures caused by incompatibility with Spark v1.6.3 fixed. Now what it is failing are the python tests because of the new aggregateMessages API in python recently added. Working on that. |
…in message as well as multiple aggregation functions
python/graphframes/graphframe.py
Outdated
@@ -239,19 +240,27 @@ def aggregateMessages(self, aggCol, sendToSrc=None, sendToDst=None): | |||
if sendToSrc is not None: | |||
if isinstance(sendToSrc, Column): | |||
builder.sendToSrc(sendToSrc._jc) | |||
elif isinstance(msgToSrc, list): | |||
for col in msgToSrc: | |||
builder.sendToSrc(col._jc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably check each of this col
is a Column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
absolutely, this is still wip
case columns => df.select( | ||
df(ID), | ||
struct(columns.sorted.map(c => col(s"`${c}`").as(removeStructName(c))) :_*) | ||
.as(AggregateMessages.MSG_COL_NAME)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain and comment why this and associated helper functions are necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the way AggregationMessages works: starts with a DataFrame resulting from calling GraphFrame.triplets, which generates a DF with columns: "src", "dst", "edge", each of which is of type struct with one struct element for each original attribute.
When 1 attribute per AggregateMessages was supported, this attribute was used in the projection of a query to this df. So let's say I have the following graph:
GraphFrame(v:[id: string, name: string, age: int], e:[src: string, dst: string, relationship: string])
If I wanted to send vertex attribute "name" from src to dst, that would generate the following query:
select dst['id'] AS id, src['name'] AS MSG from df.triplets
in the inverse direction
select src['id'] AS id, dst['name'] AS MSG from df.triplets
and in both directions:
select dst['id'] AS id, src['name'] AS MSG from df.triplets UNION select src['id'] AS id, dst['name'] AS MSG from df.triplets
Now, with the changes I introduced, multiples attribute can be sent in a message. The way it is resolved is to group all the attributes in a struct type. So following the same example, if I wanted to send both attributes "name" and "age" from src to dst the new query would be:
select dst['id'] AS id, struct(src['name'], src['age']) AS MSG from df.triplets
and similar for the other directions.
So this solution brings some challenges:
When I try to create a struct from existing struct elements the new generated df creates a struct naming each element col1, col2.. colN rather that the original name. With the example above try the following:
g.triplets.select(struct(expr("src['id']")).as("MSG")).printSchema
g.triplets.select(struct(col("src")("id")).as("MSG")).printSchema
and you will see what I'm saying. So what basically line 133 does is to create the struct but maintaining the original attribute names.
Additionally what removeStructName and the other helper functions do is the following: If I kept the original name as I said above, then I would end up with the following struct:
struct("src.name", "src.age") as MSG
that represents the message sent from src to dst. However, the message in the opposite direction would be:
struct("dst.name", "dst.age") as MSG
Since both structs have different element names, the union between both df doesn't work. So the helper method removes the struct name from the columns to end up having the following struct for both directions:
struct("name", "age") as MSG
This way the union is possible.
Last but not least, what I found is that while in Spark 1.6 the default column name set in a DF when you select a struct element is struct_name[struct_element]
, i.e. src[name]
in Spark 2.x it is struct_name.struct_element
, i.e. src.name
. That's why you will see the functions removeStructNameForSpark2 and removeStructNameForSpark16
Hope this clarifies your doubts. Please let me know if you have further questions
One check failed due to an OutOfMemory error while running test_connected_components_friends (graphframes.tests.GraphFrameLibTest) unit test which has nothing to do with the changes in this PR. Has this ever happened before? Do we have to increase the heap assigned to the jobs in Travis? |
hmm, not sure. if you see PR # 195 which is a recent PR related to connected component, tests seem to be passing |
Codecov Report
@@ Coverage Diff @@
## master #186 +/- ##
=======================================
Coverage 89.18% 89.18%
=======================================
Files 20 20
Lines 740 740
Branches 40 39 -1
=======================================
Hits 660 660
Misses 80 80
Continue to review full report at Codecov.
|
@felixcheung I think PR #195 is about fixing the non-deterministic ID assignment when a GraphFrame is converted to a GraphX and the vertexId is not convertible to a Long type. So even though the bug was detected in the connected component module, the bug affects every module/class or piece of code that calls GraphFrame.toGraphX method, i.e. connected component, lpa, pagerank, etc. Actually the fix doesn't involve changes in the connected component class. I know because I've been following this PR very close because I experienced the same bug using lpa and proprietary algorithm implemented with Pregel. Anyway, I just pushed some minor changes (docs, validations, etc) and now all checks have passed. So this is a first version of this new feature. I'm planning to keep working on resolving the first limitation I listed above: "if both sendToSrc & sendToDst methods are called, then you have to call both methods with the same columns" In the meantime if you can review this change and provide feedback, I would really appreciate it. |
@estebandonato thanks for adding the details! I'm hoping to get back and review in a week or two. Sorry about the delay. |
@felixcheung did you have the chance to review this pr? In the meantime, since spark 1.6 is no longer supported I will remove some helper functions to simplify the code. |
sorry for dropping this - please update - I should have some time this week to review. thanks! |
would you like to follow up? |
yes, I'll be finishing this PR this week removing support for spark 1.6. Besides this change, the feature is completed. Sorry for the delay |
@felixcheung finally I'm done with this feature. In the last commit I removed support for spark 1.6 and also performed some code refactoring which makes the code much simpler to follow. I also changed the api, so now if you want to send multiple columns in a message, rather than having to call sendToSrc or sendToDst multiple times you call it once since both methods accept a varargs of Please review the solution and let me know your feedback. |
ah, thanks, I really like this newer approach, it seems to make a much cleaner solution. I'll review this in more details shortly. |
@felixcheung done! I updated #186 comment with the latest. Let me know your feedback when you review this. |
really need this feature! what's the status now? |
@SidWeng this feature is pending to be reviewed by @felixcheung |
@estebandonato hi, any updates on this feature? |
@tchow-notion no updates unfortunately. Apparently it was not prioritized for revision and now the PR is stale. Except somebody has a different opinion, we should cancel this. |
This PR is to address issue #178 There are 2 changes in AggregationMessages class:
Regarding 1. sendToSrc & sendToDst methods now accept varargs of
org.apache.spark.sql.Column
orString
instances and when the message is created all these columns are grouped into a struct type. If both sendToSrc & sendToDst methods are called with multiple columns, both structs (number of columns, column names and types) have to be equalRegarding 2. multiple aggregation functions are executed over the Dataframe that represents the messages. The resulting Dataframe contains the vertexId + one column per aggregation function passed to agg method with the result of each aggregation.