Skip to content

Commit 020ff6f

Browse files
authored
Update StreamAppForeachSink.java
1 parent 2d07311 commit 020ff6f

File tree

1 file changed

+22
-8
lines changed

1 file changed

+22
-8
lines changed

src/main/java/com/example/StreamAppForeachSink.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ public class StreamAppForeachSink {
2222

2323
public static void main(String[] args) throws AnalysisException,
2424
StreamingQueryException {
25-
26-
String checkpointDir = args[1];
25+
26+
String checkpointDir = args[0];
27+
2728

28-
ObjectMapper mapper = new ObjectMapper();
2929

3030
SparkConf conf = new SparkConf().setAppName(StreamAppForeachSink.class.getName())
3131
.setIfMissing("spark.master", "local[*]")
@@ -37,8 +37,23 @@ public static void main(String[] args) throws AnalysisException,
3737
Dataset<String> rawStream = spark.readStream().format("socket")
3838
.option("host", "localhost").option("port", "9999").load()
3939
.as(Encoders.STRING());
40+
41+
rawStream.printSchema();
42+
System.out.println("Is streaming: " + rawStream.isStreaming());
43+
44+
45+
46+
Dataset<Row> tweets = rawStream
47+
.select(functions.from_json(functions.col("value"), Encoders.bean(Tweet.class).schema()).as("root"))
48+
.selectExpr("root.*")
49+
.withColumn("timestamp", functions.current_timestamp());
50+
51+
System.out.println("Schema of tweets streaming dataset");
52+
tweets.printSchema();
4053

41-
Dataset<Row> tweets = rawStream.mapPartitions(
54+
/*
55+
ObjectMapper mapper = new ObjectMapper();
56+
Dataset<Row> tweets = rawStream.mapPartitions(
4257
items -> {
4358
List<Tweet> tweetsIters = new ArrayList<Tweet>();
4459
while (items.hasNext()) {
@@ -51,10 +66,8 @@ public static void main(String[] args) throws AnalysisException,
5166
}
5267
return tweetsIters.iterator();
5368
}, Encoders.bean(Tweet.class)).withColumn("timestamp",
54-
functions.current_timestamp());
69+
functions.current_timestamp());*/
5570

56-
System.out.println("Is streaming: " + rawStream.isStreaming());
57-
tweets.printSchema();
5871

5972
Dataset<Row> tagsAgg = tweets.withColumn(
6073
"tag",
@@ -65,11 +78,12 @@ public static void main(String[] args) throws AnalysisException,
6578
.count();
6679

6780
tagsAgg.writeStream()
68-
.outputMode(OutputMode.Complete())
81+
.outputMode(OutputMode.Update())
6982
.trigger(Trigger.ProcessingTime("5 seconds"))
7083
.foreach(new CustomForEachSink())
7184
.start();
7285

7386
spark.streams().awaitAnyTermination();
7487
}
7588
}
89+

0 commit comments

Comments
 (0)