@@ -26,45 +26,43 @@ import org.graphframes.GraphFrame
2626import org .graphframes .examples .Graphs .gridIsingModel
2727import org .graphframes .lib .AggregateMessages
2828
29-
3029/**
3130 * Example code for Belief Propagation (BP)
3231 *
33- * This provides a template for building customized BP algorithms for different types of
34- * graphical models.
32+ * This provides a template for building customized BP algorithms for different types of graphical
33+ * models.
3534 *
3635 * This example:
37- * - Ising model on a grid
38- * - Parallel Belief Propagation using colored fields
36+ * - Ising model on a grid
37+ * - Parallel Belief Propagation using colored fields
3938 *
40- * Ising models are probabilistic graphical models over binary variables x,,i,,.
41- * Each binary variable x,,i,, corresponds to one vertex, and it may take values -1 or +1.
42- * The probability distribution P(X) (over all x,,i,,) is parameterized by vertex factors a,,i,,
43- * and edge factors b,,ij,,:
39+ * Ising models are probabilistic graphical models over binary variables x,,i,,. Each binary
40+ * variable x,,i,, corresponds to one vertex, and it may take values -1 or +1. The probability
41+ * distribution P(X) (over all x,,i,,) is parameterized by vertex factors a,,i,, and edge factors
42+ * b,,ij,,:
4443 * {{{
4544 * P(X) = (1/Z) * exp[ \sum_i a_i x_i + \sum_{ij} b_{ij} x_i x_j ]
4645 * }}}
47- * where Z is the normalization constant (partition function).
48- * See [[https://en.wikipedia.org/wiki/Ising_model Wikipedia ]] for more information on Ising models.
46+ * where Z is the normalization constant (partition function). See
47+ * [[https://en.wikipedia.org/wiki/Ising_model Wikipedia ]] for more information on Ising models.
4948 *
5049 * Belief Propagation (BP) provides marginal probabilities of the values of the variables x,,i,,,
51- * i.e., P(x,,i,,) for each i. This allows a user to understand likely values of variables.
52- * See [[https://en.wikipedia.org/wiki/Belief_propagation Wikipedia ]] for more information on BP.
50+ * i.e., P(x,,i,,) for each i. This allows a user to understand likely values of variables. See
51+ * [[https://en.wikipedia.org/wiki/Belief_propagation Wikipedia ]] for more information on BP.
5352 *
5453 * We use a batch synchronous BP algorithm, where batches of vertices are updated synchronously.
5554 * We follow the mean field update algorithm in Slide 13 of the
56- * [[http://www.eecs.berkeley.edu/~wainwrig/Talks/A_GraphModel_Tutorial talk slides ]] from:
57- * Wainwright. "Graphical models, message-passing algorithms, and convex optimization."
55+ * [[http://www.eecs.berkeley.edu/~wainwrig/Talks/A_GraphModel_Tutorial talk slides ]] from:
56+ * Wainwright. "Graphical models, message-passing algorithms, and convex optimization."
5857 *
59- * The batches are chosen according to a coloring. For background on graph colorings for inference,
60- * see for example:
61- * Gonzalez et al. "Parallel Gibbs Sampling: From Colored Fields to Thin Junction Trees."
62- * AISTATS, 2011.
58+ * The batches are chosen according to a coloring. For background on graph colorings for
59+ * inference, see for example: Gonzalez et al. "Parallel Gibbs Sampling: From Colored Fields to
60+ * Thin Junction Trees." AISTATS, 2011.
6361 *
6462 * The BP algorithm works by:
65- * - Coloring the graph by assigning a color to each vertex such that no neighboring vertices
66- * share the same color.
67- * - In each step of BP, update all vertices of a single color. Alternate colors.
63+ * - Coloring the graph by assigning a color to each vertex such that no neighboring vertices
64+ * share the same color.
65+ * - In each step of BP, update all vertices of a single color. Alternate colors.
6866 */
6967object BeliefPropagation {
7068
@@ -94,14 +92,16 @@ object BeliefPropagation {
9492 }
9593
9694 /**
97- * Given a GraphFrame, choose colors for each vertex. No neighboring vertices will share the
98- * same color. The number of colors is minimized.
95+ * Given a GraphFrame, choose colors for each vertex. No neighboring vertices will share the
96+ * same color. The number of colors is minimized.
9997 *
10098 * This is written specifically for grid graphs. For non-grid graphs, it should be generalized,
10199 * such as by using a greedy coloring scheme.
102100 *
103- * @param g Grid graph generated by [[org.graphframes.examples.Graphs.gridIsingModel() ]]
104- * @return Same graph, but with a new vertex column "color" of type Int (0 or 1)
101+ * @param g
102+ * Grid graph generated by [[org.graphframes.examples.Graphs.gridIsingModel() ]]
103+ * @return
104+ * Same graph, but with a new vertex column "color" of type Int (0 or 1)
105105 */
106106 private def colorGraph (g : GraphFrame ): GraphFrame = {
107107 val colorUDF = udf { (i : Int , j : Int ) => (i + j) % 2 }
@@ -112,19 +112,22 @@ object BeliefPropagation {
112112 /**
113113 * Run Belief Propagation.
114114 *
115- * This implementation of BP shows how to use GraphX's aggregateMessages method.
116- * It is simple to convert to and from GraphX format. This method does the following:
117- * - Color GraphFrame vertices for BP scheduling.
118- * - Convert GraphFrame to GraphX format.
119- * - Run BP using GraphX's aggregateMessages API.
120- * - Augment the original GraphFrame with the BP results (vertex beliefs).
115+ * This implementation of BP shows how to use GraphX's aggregateMessages method. It is simple to
116+ * convert to and from GraphX format. This method does the following:
117+ * - Color GraphFrame vertices for BP scheduling.
118+ * - Convert GraphFrame to GraphX format.
119+ * - Run BP using GraphX's aggregateMessages API.
120+ * - Augment the original GraphFrame with the BP results (vertex beliefs).
121121 *
122- * @param g Graphical model created by `org.graphframes.examples.Graphs.gridIsingModel()`
123- * @param numIter Number of iterations of BP to run. One iteration includes updating each
124- * vertex's belief once.
125- * @return Same graphical model, but with [[GraphFrame.vertices ]] augmented with a new column
126- * "belief" containing P(x,,i,, = +1), the marginal probability of vertex i taking
127- * value +1 instead of -1.
122+ * @param g
123+ * Graphical model created by `org.graphframes.examples.Graphs.gridIsingModel()`
124+ * @param numIter
125+ * Number of iterations of BP to run. One iteration includes updating each vertex's belief
126+ * once.
127+ * @return
128+ * Same graphical model, but with [[GraphFrame.vertices ]] augmented with a new column "belief"
129+ * containing P(x,,i,, = +1), the marginal probability of vertex i taking value +1 instead of
130+ * -1.
128131 */
129132 def runBPwithGraphX (g : GraphFrame , numIter : Int ): GraphFrame = {
130133 // Choose colors for vertices for BP scheduling.
@@ -166,15 +169,14 @@ object BeliefPropagation {
166169 },
167170 _ + _)
168171 // Receive messages, and update beliefs for vertices of the current color.
169- gx = gx.outerJoinVertices(msgs) {
170- case (vID, vAttr, optMsg) =>
171- if (vAttr.color == color) {
172- val x = vAttr.a + optMsg.getOrElse(0.0 )
173- val newBelief = math.exp(- log1pExp(- x))
174- VertexAttr (vAttr.a, newBelief, color)
175- } else {
176- vAttr
177- }
172+ gx = gx.outerJoinVertices(msgs) { case (vID, vAttr, optMsg) =>
173+ if (vAttr.color == color) {
174+ val x = vAttr.a + optMsg.getOrElse(0.0 )
175+ val newBelief = math.exp(- log1pExp(- x))
176+ VertexAttr (vAttr.a, newBelief, color)
177+ } else {
178+ vAttr
179+ }
178180 }
179181 }
180182 }
@@ -192,16 +194,19 @@ object BeliefPropagation {
192194 * Run Belief Propagation.
193195 *
194196 * This implementation of BP shows how to use GraphFrame's aggregateMessages method.
195- * - Color GraphFrame vertices for BP scheduling.
196- * - Run BP using GraphFrame's aggregateMessages API.
197- * - Augment the original GraphFrame with the BP results (vertex beliefs).
197+ * - Color GraphFrame vertices for BP scheduling.
198+ * - Run BP using GraphFrame's aggregateMessages API.
199+ * - Augment the original GraphFrame with the BP results (vertex beliefs).
198200 *
199- * @param g Graphical model created by `org.graphframes.examples.Graphs.gridIsingModel()`
200- * @param numIter Number of iterations of BP to run. One iteration includes updating each
201- * vertex's belief once.
202- * @return Same graphical model, but with [[GraphFrame.vertices ]] augmented with a new column
203- * "belief" containing P(x,,i,, = +1), the marginal probability of vertex i taking
204- * value +1 instead of -1.
201+ * @param g
202+ * Graphical model created by `org.graphframes.examples.Graphs.gridIsingModel()`
203+ * @param numIter
204+ * Number of iterations of BP to run. One iteration includes updating each vertex's belief
205+ * once.
206+ * @return
207+ * Same graphical model, but with [[GraphFrame.vertices ]] augmented with a new column "belief"
208+ * containing P(x,,i,, = +1), the marginal probability of vertex i taking value +1 instead of
209+ * -1.
205210 */
206211 def runBPwithGraphFrames (g : GraphFrame , numIter : Int ): GraphFrame = {
207212 // Choose colors for vertices for BP scheduling.
@@ -231,15 +236,16 @@ object BeliefPropagation {
231236 .agg(sum(AM .msg).as(" aggMess" ))
232237 val v = gx.vertices
233238 // Receive messages, and update beliefs for vertices of the current color.
234- val newBeliefCol = when(v(" color" ) === color && aggregates(" aggMess" ).isNotNull,
239+ val newBeliefCol = when(
240+ v(" color" ) === color && aggregates(" aggMess" ).isNotNull,
235241 logistic(aggregates(" aggMess" ) + v(" a" )))
236- .otherwise(v(" belief" )) // keep old beliefs for other colors
242+ .otherwise(v(" belief" )) // keep old beliefs for other colors
237243 val newVertices = v
238- .join(aggregates, v(" id" ) === aggregates(" id" ), " left_outer" ) // join messages, vertices
239- .drop(aggregates(" id" )) // drop duplicate ID column (from outer join)
240- .withColumn(" newBelief" , newBeliefCol) // compute new beliefs
241- .drop(" aggMess" ) // drop messages
242- .drop(" belief" ) // drop old beliefs
244+ .join(aggregates, v(" id" ) === aggregates(" id" ), " left_outer" ) // join messages, vertices
245+ .drop(aggregates(" id" )) // drop duplicate ID column (from outer join)
246+ .withColumn(" newBelief" , newBeliefCol) // compute new beliefs
247+ .drop(" aggMess" ) // drop messages
248+ .drop(" belief" ) // drop old beliefs
243249 .withColumnRenamed(" newBelief" , " belief" )
244250 // Cache new vertices using workaround for SPARK-13346
245251 val cachedNewVertices = AM .getCachedDataFrame(newVertices)
0 commit comments