feat(pregel): automatically skip second join when dst columns not needed#795
Draft
james-willis wants to merge 6 commits intographframes:mainfrom
Draft
feat(pregel): automatically skip second join when dst columns not needed#795james-willis wants to merge 6 commits intographframes:mainfrom
james-willis wants to merge 6 commits intographframes:mainfrom
Conversation
Implements automatic optimization for Pregel triplet generation that skips the second join (adding destination vertex state) when no message expressions reference dst.* columns. The optimization works by: 1. Analyzing all message expressions before the iteration loop 2. Extracting column prefixes (src, dst, edge) from the expression AST 3. Skipping the dst vertex join if no dst.* columns are referenced AND skipMessagesFromNonActiveVertices is disabled This provides significant performance improvement for algorithms like PageRank, directed LabelPropagation, and DetectingCycles that only need source vertex or edge columns in their message expressions. Closes graphframes#790
james-willis
commented
Feb 6, 2026
core/src/main/scala-spark-3/org/apache/spark/sql/graphframes/SparkShims.scala
Outdated
Show resolved
Hide resolved
…id when skipping join The previous implementation incorrectly checked both the target ID expression and message expression for dst.* references. Since sendMsgToDst uses Pregel.dst(ID) as the target, it would always detect 'dst' as referenced even when the message itself only used src columns. This fix: 1. Only analyzes the message expressions (not target ID) for dst.* references 2. When skipping the join, creates a minimal dst struct with just the id from edge_dst so that sendMsgToDst can still route messages correctly Added test: 'sendMsgToDst with only src columns in message' to verify the optimization works correctly when dst.id is implicitly used for routing.
- Add extractColumnReferences to SparkShims returning Map[String, Set[String]] to track which specific fields are accessed under each prefix - Handle resolved expressions (AttributeReference, GetStructField) in addition to unresolved ones for more robust column detection - Update Pregel optimization to skip dst join when only dst.id is referenced since dst.id is available from the edge's dst column - Change optimization log message from logInfo to logDebug - Add test for dst.id-only reference case
- Remove unused extractColumnPrefixes method from SparkShims (both Spark 3/4) - Refactor Pregel.scala to parse expressions once instead of twice - Add documentation for deeply nested struct access fallback behavior
…n optimization - Add SparkShimsSuite with 22 unit tests for column reference extraction - Add 4 integration tests to PregelSuite for complex dst usage patterns - Fix UTF8String handling in UnresolvedExtractValue pattern matching
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements automatic optimization for Pregel triplet generation that skips the second join (adding destination vertex state) when no message expressions reference
dst.*columns.Closes #790
Changes
extractColumnPrefixeshelper method toSparkShims(both Spark 3 and Spark 4 versions) to analyze Column expressions and extract column name prefixesPregel.run()to detect if destination vertex state is needed by analyzing all message expressionsdst.*columns are not referenced ANDskipMessagesFromNonActiveVerticesis disabled, the second join is automatically skippedHow it works
sendMsgToSrc/sendMsgToDst) to collect which column prefixes are referenceddst.*columns, skip the second join entirelyAlgorithms that benefit
Pregel.src("rank")andPregel.src("outDegree")Pregel.src(LABEL_ID)Pregel.src(storedSeqCol)andPregel.dst(GraphFrame.ID), but ID is available from the join keyTesting
All 227 core tests pass, including 5 new tests specifically for this optimization:
automatic dst join skipping - PageRank only uses src columnsautomatic dst join NOT skipped when dst columns are referencedautomatic dst join NOT skipped when skipMessagesFromNonActiveVertices is enabledautomatic dst join skipping - sendMsgToSrc with only edge columnsautomatic dst join skipping - edge columns only