forked from awsdocs/aws-doc-sdk-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkda-java-streams.java
More file actions
103 lines (79 loc) · 4.6 KB
/
Copy pathkda-java-streams.java
File metadata and controls
103 lines (79 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// This file is licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License. A copy of
// the License is located at
//
// http://aws.amazon.com/apache2.0/
//
// This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
// snippet-sourcedescription:[kinesisanalytics.java.example.streams-sink demonstrates how to send data from a Kinesis Data Analytics application to a Kinesis Streams Flink sink.]
// snippet-service:[kinesisanalytics]
// snippet-keyword:[java]
// snippet-keyword:[Amazon Kinesis Data Analytics]
// snippet-keyword:[Code Sample]
// snippet-keyword:[none]
// snippet-sourcetype:[full-example]
// snippet-sourcedate:[2019-01-29]
// snippet-sourceauthor:[AWS]
// snippet-start:[kinesisanalytics.java.example.streams-sink]
package com.amazonaws.services.kinesisanalytics;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
public class StreamingJob {
private static final String region = "us-east-1";
private static final String inputStreamName = "ExampleInputStream";
private static final String outputStreamName = "ExampleOutputStream";
private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
}
private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException {
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
applicationProperties.get("ConsumerConfigProperties")));
}
private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
outputProperties.setProperty("AggregationEnabled", "false");
FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
sink.setDefaultStream(outputStreamName);
sink.setDefaultPartition("0");
return sink;
}
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
applicationProperties.get("ProducerConfigProperties"));
sink.setDefaultStream(outputStreamName);
sink.setDefaultPartition("0");
return sink;
}
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/* if you would like to use runtime configuration properties, uncomment the lines below
* DataStream<String> input = createSourceFromApplicationProperties(env);
*/
DataStream<String> input = createSourceFromStaticConfig(env);
/* if you would like to use runtime configuration properties, uncomment the lines below
* input.addSink(createSinkFromApplicationProperties())
*/
input.addSink(createSinkFromStaticConfig());
env.execute("Flink Streaming Java API Skeleton");
}
}
// snippet-end:[kinesisanalytics.java.example.streams-sink]