Skip to content

feat(pregel): automatically skip second join when dst columns not needed#795

Draft
james-willis wants to merge 6 commits intographframes:mainfrom
james-willis:feat/auto-skip-dst-join-790
Draft

feat(pregel): automatically skip second join when dst columns not needed#795
james-willis wants to merge 6 commits intographframes:mainfrom
james-willis:feat/auto-skip-dst-join-790

Conversation

@james-willis
Copy link
Collaborator

Summary

Implements automatic optimization for Pregel triplet generation that skips the second join (adding destination vertex state) when no message expressions reference dst.* columns.

Closes #790

Changes

  • Added extractColumnPrefixes helper method to SparkShims (both Spark 3 and Spark 4 versions) to analyze Column expressions and extract column name prefixes
  • Modified Pregel.run() to detect if destination vertex state is needed by analyzing all message expressions
  • When dst.* columns are not referenced AND skipMessagesFromNonActiveVertices is disabled, the second join is automatically skipped
  • Added 5 comprehensive tests for the optimization

How it works

  1. Detection phase (before iteration loop): Analyze all message expressions (sendMsgToSrc/sendMsgToDst) to collect which column prefixes are referenced
  2. Conditional join: If no expressions reference dst.* columns, skip the second join entirely
val needsDstState = allReferencedPrefixes.contains(DST) || skipMessagesFromNonActiveVertices

Algorithms that benefit

  • PageRank: Only uses Pregel.src("rank") and Pregel.src("outDegree")
  • LabelPropagation (directed mode): Only uses Pregel.src(LABEL_ID)
  • DetectingCycles: Uses Pregel.src(storedSeqCol) and Pregel.dst(GraphFrame.ID), but ID is available from the join key

Testing

All 227 core tests pass, including 5 new tests specifically for this optimization:

  • automatic dst join skipping - PageRank only uses src columns
  • automatic dst join NOT skipped when dst columns are referenced
  • automatic dst join NOT skipped when skipMessagesFromNonActiveVertices is enabled
  • automatic dst join skipping - sendMsgToSrc with only edge columns
  • automatic dst join skipping - edge columns only

Implements automatic optimization for Pregel triplet generation that
skips the second join (adding destination vertex state) when no message
expressions reference dst.* columns.

The optimization works by:
1. Analyzing all message expressions before the iteration loop
2. Extracting column prefixes (src, dst, edge) from the expression AST
3. Skipping the dst vertex join if no dst.* columns are referenced
   AND skipMessagesFromNonActiveVertices is disabled

This provides significant performance improvement for algorithms like
PageRank, directed LabelPropagation, and DetectingCycles that only
need source vertex or edge columns in their message expressions.

Closes graphframes#790
…id when skipping join

The previous implementation incorrectly checked both the target ID expression
and message expression for dst.* references. Since sendMsgToDst uses
Pregel.dst(ID) as the target, it would always detect 'dst' as referenced
even when the message itself only used src columns.

This fix:
1. Only analyzes the message expressions (not target ID) for dst.* references
2. When skipping the join, creates a minimal dst struct with just the id
   from edge_dst so that sendMsgToDst can still route messages correctly

Added test: 'sendMsgToDst with only src columns in message' to verify
the optimization works correctly when dst.id is implicitly used for routing.
- Add extractColumnReferences to SparkShims returning Map[String, Set[String]]
  to track which specific fields are accessed under each prefix
- Handle resolved expressions (AttributeReference, GetStructField) in addition
  to unresolved ones for more robust column detection
- Update Pregel optimization to skip dst join when only dst.id is referenced
  since dst.id is available from the edge's dst column
- Change optimization log message from logInfo to logDebug
- Add test for dst.id-only reference case
- Remove unused extractColumnPrefixes method from SparkShims (both Spark 3/4)
- Refactor Pregel.scala to parse expressions once instead of twice
- Add documentation for deeply nested struct access fallback behavior
…n optimization

- Add SparkShimsSuite with 22 unit tests for column reference extraction
- Add 4 integration tests to PregelSuite for complex dst usage patterns
- Fix UTF8String handling in UnresolvedExtractValue pattern matching
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: add a way to skip second join for triplets generation in Pregel

1 participant