Skip to content

Commit a98397f

Browse files
author
jerryye
committed
added PigWeightedPagerank
1 parent 5a2c7a0 commit a98397f

File tree

2 files changed

+167
-1
lines changed

2 files changed

+167
-1
lines changed

src/main/java/edu/cmu/graphchi/apps/WeightedPagerank.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public Float receiveVertexValue(int vertexId, String token) {
8585
}
8686
}, new EdgeProcessor<FloatPair>() {
8787
public FloatPair receiveEdge(int from, int to, String token) {
88-
return (new FloatPair(Float.parseFloat(token), 0.f));
88+
return new FloatPair(Float.parseFloat(token), 0.f);
8989
}
9090
}, new FloatConverter(), new FloatPairConverter());
9191
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package edu.cmu.graphchi.apps.pig;
2+
3+
import edu.cmu.graphchi.ChiVertex;
4+
import edu.cmu.graphchi.GraphChiContext;
5+
import edu.cmu.graphchi.GraphChiProgram;
6+
import edu.cmu.graphchi.datablocks.FloatConverter;
7+
import edu.cmu.graphchi.datablocks.FloatPair;
8+
import edu.cmu.graphchi.datablocks.FloatPairConverter;
9+
import edu.cmu.graphchi.engine.GraphChiEngine;
10+
import edu.cmu.graphchi.engine.VertexInterval;
11+
import edu.cmu.graphchi.hadoop.PigGraphChiBase;
12+
import edu.cmu.graphchi.preprocessing.EdgeProcessor;
13+
import edu.cmu.graphchi.preprocessing.FastSharder;
14+
import edu.cmu.graphchi.preprocessing.VertexProcessor;
15+
import edu.cmu.graphchi.vertexdata.VertexAggregator;
16+
import edu.cmu.graphchi.vertexdata.VertexIdValue;
17+
import org.apache.pig.backend.executionengine.ExecException;
18+
import org.apache.pig.data.Tuple;
19+
import org.apache.pig.data.TupleFactory;
20+
21+
import java.io.IOException;
22+
import java.util.Iterator;
23+
import java.util.logging.Logger;
24+
25+
/**
26+
* Example application: PageRank (http://en.wikipedia.org/wiki/Pagerank)
27+
* Iteratively computes a pagerank for each vertex by averaging the pageranks
28+
* of in-neighbors pageranks.
29+
*
30+
* This version can be used with <a href="http://pig.apache.org">Pig</a> in a Hadoop cluster.
31+
*
32+
* Example PIG script for running this:
33+
*
34+
* <pre>
35+
* REGISTER graphchi-java-0.2-jar-with-dependencies.jar;
36+
*
37+
* pagerank = LOAD 'graphs/soc-LiveJournal1.txt' USING edu.cmu.graphchi.demo.pig.PigPagerank as (vertex:int, rank:float);
38+
*
39+
* STORE pagerank INTO 'pagerank-livejournal';
40+
* </pre>
41+
*
42+
* (To get the livejournal graph, visit: http://snap.stanford.edu/data/soc-LiveJournal1.html)
43+
*
44+
* @see edu.cmu.graphchi.hadoop.PigGraphChiBase
45+
* @author Aapo Kyrola, [email protected]
46+
*/
47+
public class PigWeightedPagerank extends PigGraphChiBase implements GraphChiProgram<Float, FloatPair> {
48+
49+
private static Logger logger = Logger.getLogger("weighted_pagerank");
50+
51+
public void update(ChiVertex<Float, FloatPair> vertex, GraphChiContext context) {
52+
if (context.getIteration() == 0) {
53+
/* Initialize on first iteration */
54+
vertex.setValue(1.0f);
55+
} else {
56+
/* On other iterations, set my value to be the weighted
57+
average of my in-coming neighbors pageranks.
58+
*/
59+
float sum = 0.f;
60+
for(int i=0; i<vertex.numInEdges(); i++) {
61+
sum += vertex.inEdge(i).getValue().second;
62+
}
63+
vertex.setValue(0.15f + 0.85f * sum);
64+
}
65+
66+
/* Accumulate edge weights */
67+
float edgeWeightSum = 0.f;
68+
for(int i=0; i<vertex.numOutEdges(); i++) {
69+
edgeWeightSum += vertex.outEdge(i).getValue().first;
70+
}
71+
72+
/* Write my value (divided by my out-degree) to my out-edges so neighbors can read it. */
73+
for(int i=0; i<vertex.numOutEdges(); i++) {
74+
FloatPair curValue = vertex.outEdge(i).getValue();
75+
float edgeWeight = vertex.outEdge(i).getValue().first;
76+
curValue.second = vertex.getValue() * edgeWeight/edgeWeightSum;
77+
vertex.outEdge(i).setValue(curValue);
78+
}
79+
80+
}
81+
82+
/**
83+
* Callbacks (not needed for Pagerank)
84+
*/
85+
public void beginIteration(GraphChiContext ctx) {}
86+
public void endIteration(GraphChiContext ctx) {}
87+
public void beginInterval(GraphChiContext ctx, VertexInterval interval) {}
88+
public void endInterval(GraphChiContext ctx, VertexInterval interval) {}
89+
public void beginSubInterval(GraphChiContext ctx, VertexInterval interval) {}
90+
public void endSubInterval(GraphChiContext ctx, VertexInterval interval) {}
91+
92+
93+
/**
94+
* PIG integration
95+
*/
96+
97+
// Objects needed for iterating the results
98+
private Iterator<VertexIdValue<Float>> vertexIterator;
99+
100+
101+
@Override
102+
/**
103+
* Pig column names
104+
*/
105+
protected String getSchemaString() {
106+
return "(vertex:int, weight:float)";
107+
}
108+
109+
@Override
110+
protected int getNumShards() {
111+
return 12; // Unfortunately, currently hard-coded.
112+
}
113+
114+
@Override
115+
/**
116+
* Runs the GraphChi program
117+
*/
118+
protected void runGraphChi() throws Exception {
119+
/* Run GraphChi */
120+
GraphChiEngine<Float, FloatPair> engine = new GraphChiEngine<Float, FloatPair>(getGraphName(), getNumShards());
121+
engine.setEdataConverter(new FloatPairConverter());
122+
engine.setVertexDataConverter(new FloatConverter());
123+
engine.setModifiesInedges(false); // Important optimization
124+
125+
engine.run(this, 4);
126+
127+
logger.info("Ready.");
128+
129+
/* Create iterator for the vertex values */
130+
this.vertexIterator = VertexAggregator.vertexIterator(engine.numVertices(), getGraphName(), new FloatConverter(),
131+
engine.getVertexIdTranslate());
132+
}
133+
134+
@Override
135+
/**
136+
* Constructs "sharder", which takes an edge list and
137+
* converts it to internal binary representation of GraphChi.
138+
*/
139+
protected FastSharder createSharder(String graphName, int numShards) throws IOException {
140+
return new FastSharder<Float, FloatPair>(graphName, numShards, new VertexProcessor<Float>() {
141+
public Float receiveVertexValue(int vertexId, String token) {
142+
return (token == null ? 0.0f : Float.parseFloat(token));
143+
}
144+
}, new EdgeProcessor<FloatPair>() {
145+
public FloatPair receiveEdge(int from, int to, String token) {
146+
return new FloatPair(Float.parseFloat(token), 0.f);
147+
}
148+
}, new FloatConverter(), new FloatPairConverter());
149+
}
150+
151+
@Override
152+
/**
153+
* Generates the output to the Pig script, tuple by tuple
154+
*/
155+
protected Tuple getNextResult(TupleFactory tupleFactory) throws ExecException {
156+
if (vertexIterator.hasNext()) {
157+
Tuple t = tupleFactory.newTuple(2);
158+
VertexIdValue<Float> val = vertexIterator.next();
159+
t.set(0, val.getVertexId());
160+
t.set(1, val.getValue());
161+
return t;
162+
} else {
163+
return null;
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)