@@ -22,10 +22,10 @@ public class StreamAppForeachSink {
2222
2323 public static void main (String [] args ) throws AnalysisException ,
2424 StreamingQueryException {
25-
26- String checkpointDir = args [1 ];
25+
26+ String checkpointDir = args [0 ];
27+
2728
28- ObjectMapper mapper = new ObjectMapper ();
2929
3030 SparkConf conf = new SparkConf ().setAppName (StreamAppForeachSink .class .getName ())
3131 .setIfMissing ("spark.master" , "local[*]" )
@@ -37,8 +37,23 @@ public static void main(String[] args) throws AnalysisException,
3737 Dataset <String > rawStream = spark .readStream ().format ("socket" )
3838 .option ("host" , "localhost" ).option ("port" , "9999" ).load ()
3939 .as (Encoders .STRING ());
40+
41+ rawStream .printSchema ();
42+ System .out .println ("Is streaming: " + rawStream .isStreaming ());
43+
44+
45+
46+ Dataset <Row > tweets = rawStream
47+ .select (functions .from_json (functions .col ("value" ), Encoders .bean (Tweet .class ).schema ()).as ("root" ))
48+ .selectExpr ("root.*" )
49+ .withColumn ("timestamp" , functions .current_timestamp ());
50+
51+ System .out .println ("Schema of tweets streaming dataset" );
52+ tweets .printSchema ();
4053
41- Dataset <Row > tweets = rawStream .mapPartitions (
54+ /*
55+ ObjectMapper mapper = new ObjectMapper();
56+ Dataset<Row> tweets = rawStream.mapPartitions(
4257 items -> {
4358 List<Tweet> tweetsIters = new ArrayList<Tweet>();
4459 while (items.hasNext()) {
@@ -51,10 +66,8 @@ public static void main(String[] args) throws AnalysisException,
5166 }
5267 return tweetsIters.iterator();
5368 }, Encoders.bean(Tweet.class)).withColumn("timestamp",
54- functions .current_timestamp ());
69+ functions.current_timestamp());*/
5570
56- System .out .println ("Is streaming: " + rawStream .isStreaming ());
57- tweets .printSchema ();
5871
5972 Dataset <Row > tagsAgg = tweets .withColumn (
6073 "tag" ,
@@ -65,11 +78,12 @@ public static void main(String[] args) throws AnalysisException,
6578 .count ();
6679
6780 tagsAgg .writeStream ()
68- .outputMode (OutputMode .Complete ())
81+ .outputMode (OutputMode .Update ())
6982 .trigger (Trigger .ProcessingTime ("5 seconds" ))
7083 .foreach (new CustomForEachSink ())
7184 .start ();
7285
7386 spark .streams ().awaitAnyTermination ();
7487 }
7588}
89+
0 commit comments