Skip to content

Commit 6deb773

Browse files
author
Abul Basar
committed
update
1 parent dfb5c0f commit 6deb773

File tree

1 file changed

+122
-0
lines changed

1 file changed

+122
-0
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package com.example.df;
2+
3+
import org.apache.spark.SparkConf;
4+
import org.apache.spark.sql.Dataset;
5+
import org.apache.spark.sql.Encoders;
6+
import org.apache.spark.sql.Row;
7+
import org.apache.spark.sql.SparkSession;
8+
import org.apache.spark.sql.api.java.UDF3;
9+
import org.apache.spark.sql.types.DataTypes;
10+
11+
import java.util.Arrays;
12+
import java.util.List;
13+
import java.util.regex.Pattern;
14+
15+
public class DFApp4 {
16+
17+
static class WordDistanceUdf implements UDF3<String, String, String, Integer>{
18+
19+
private Pattern wordSep;
20+
21+
public WordDistanceUdf(){
22+
wordSep = Pattern.compile("\\s+");
23+
}
24+
25+
@Override
26+
public Integer call(String text, String word1, String word2) throws Exception {
27+
Integer result = null;
28+
if(!(text == null || word1 == null || word2 == null)){
29+
if(word1.equals(word2)){
30+
result = 0;
31+
}else { // 1. fast fast slow 2. fast fast fast
32+
final String[] split = wordSep.split(text);
33+
for (int i = 0; i < split.length; i++) {
34+
if (split[i].equals(word1)) {
35+
for (int j = 0; j < split.length; j++) {
36+
if (split[j].equals(word2)) {
37+
int d = Math.abs(i - j) - 1;
38+
result = Math.min(result, d);
39+
break;
40+
}
41+
}
42+
}
43+
}
44+
}
45+
}
46+
return result;
47+
}
48+
}
49+
50+
51+
public void start(String ...args){
52+
SparkConf conf = new SparkConf();
53+
conf.setAppName(getClass().getName());
54+
conf.setMaster("local[4]"); // 4 executor threads
55+
conf.setIfMissing("spark.master", "local[4]");
56+
conf.setIfMissing("spark.default.parallelism", "16");
57+
conf.set("spark.hadoop.validateOutputSpecs", "false");
58+
conf.set("spark.eventLog.enabled", "true");
59+
conf.set("spark.eventLog.dir", "/tmp/spark-events-logs");
60+
61+
final SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
62+
63+
/*
64+
sparkSession.udf().register("word_distance", (String text, String word1, String word2) -> {
65+
Integer result = null;
66+
if(!(text == null || word1 == null || word2 == null)){
67+
if(word1.equals(word2)){
68+
result = 0;
69+
}else {
70+
final String[] split = text.split("\\s+");
71+
for (int i = 0; i < split.length; i++) {
72+
if (split[i].equals(word1)) {
73+
for (int j = 0; j < split.length; j++) {
74+
if (split[j].equals(word2)) {
75+
result = Math.abs(i - j) - 1;
76+
break;
77+
}
78+
}
79+
}
80+
}
81+
}
82+
}
83+
return result;
84+
}, DataTypes.IntegerType);
85+
*/
86+
87+
sparkSession.udf().register("word_distance", new WordDistanceUdf(), DataTypes.IntegerType);
88+
89+
90+
final String[] testStrings = new String[]{
91+
"Fast text searching for regular expressions or automaton searching on tries",
92+
null,
93+
"hello world"
94+
};
95+
96+
final Dataset<Row> dataset = sparkSession
97+
.createDataset(Arrays.asList(testStrings), Encoders.STRING())
98+
.toDF("value")
99+
;
100+
101+
final List<Integer> distances = dataset
102+
.selectExpr("word_distance(value, 'text', 'expressions')")
103+
.as(Encoders.INT())
104+
.collectAsList();
105+
106+
107+
assert distances.get(0) == 3;
108+
assert distances.get(1) == null;
109+
assert distances.get(2) == null;
110+
111+
sparkSession.close();
112+
113+
114+
}
115+
116+
117+
public static void main(String ...args){
118+
new DFApp4().start(args);
119+
}
120+
121+
122+
}

0 commit comments

Comments
 (0)