Skip to content

Commit 6cc0829

Browse files
authored
Merge pull request #1 from simenliuxing/v1.9_zhaoshang
V1.9 zhaoshang
2 parents 2d86ee2 + 2d4360a commit 6cc0829

File tree

17 files changed

+181
-811
lines changed

17 files changed

+181
-811
lines changed

core/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@
6767
</dependency>
6868

6969
<dependency>
70-
<groupId>org.apache.flink</groupId>
71-
<artifactId>flink-yarn_2.11</artifactId>
72-
<version>${flink.version}</version>
70+
<groupId>commons-codec</groupId>
71+
<artifactId>commons-codec</artifactId>
72+
<version>1.10</version>
7373
</dependency>
7474

7575
<dependency>

core/src/main/java/com/dtstack/flink/sql/enums/ClusterMode.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,14 @@
2424
*/
2525
public enum ClusterMode {
2626

27-
local(0),standalone(1),yarn(2),yarnPer(3);
27+
//run in local
28+
local(0),
29+
//submit job to standalone cluster
30+
standalone(1),
31+
//submit job to flink-session which is already run on yarn
32+
yarn(2),
33+
//submit job to yarn cluster as an application
34+
yarnPer(3);
2835

2936
private int type;
3037

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,4 +277,9 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
277277
}
278278
}
279279
}
280+
281+
@Override
282+
public TypeInformation<Row> getProducedType() {
283+
return typeInfo;
284+
}
280285
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
281281
}
282282
}
283283

284-
284+
@Override
285+
public TypeInformation<Row> getProducedType() {
286+
return typeInfo;
287+
}
285288

286289
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@
5353
import java.util.Map;
5454
import java.util.Set;
5555

56-
import static com.dtstack.flink.sql.metric.MetricConstant.*;
56+
import static com.dtstack.flink.sql.metric.MetricConstant.DT_PARTITION_GROUP;
57+
import static com.dtstack.flink.sql.metric.MetricConstant.DT_TOPIC_GROUP;
58+
import static com.dtstack.flink.sql.metric.MetricConstant.DT_TOPIC_PARTITION_LAG_GAUGE;
5759

5860
/**
5961
* json string parsing custom
@@ -283,4 +285,10 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
283285
}
284286
}
285287
}
288+
289+
@Override
290+
public TypeInformation<Row> getProducedType() {
291+
return typeInfo;
292+
}
293+
286294
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,11 @@ public DataStreamSink<Row> consumeDataStream(DataStream<Tuple2<Boolean, Row>> da
125125
serializationSchema
126126
);
127127

128-
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
129-
return record.f1;
130-
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
128+
DataStream<Row> ds = dataStream
129+
.filter((Tuple2<Boolean, Row> record) -> record.f0)
130+
.map((Tuple2<Boolean, Row> record) -> {return record.f1;})
131+
.returns(getOutputType().getTypeAt(1))
132+
.setParallelism(parallelism);
131133

132134
DataStreamSink<Row> dataStreamSink = (DataStreamSink<Row>) kafkaTableSink.consumeDataStream(ds);
133135
return dataStreamSink;

launcher/pom.xml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,42 @@
2222
<groupId>org.apache.flink</groupId>
2323
<artifactId>flink-yarn_2.11</artifactId>
2424
<version>${flink.version}</version>
25+
<exclusions>
26+
<exclusion>
27+
<groupId>org.apache.flink</groupId>
28+
<artifactId>flink-shaded-hadoop-2</artifactId>
29+
</exclusion>
30+
</exclusions>
31+
</dependency>
32+
33+
<dependency>
34+
<groupId>org.apache.hadoop</groupId>
35+
<artifactId>hadoop-common</artifactId>
36+
<version>${hadoop.version}</version>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>org.apache.hadoop</groupId>
41+
<artifactId>hadoop-hdfs</artifactId>
42+
<version>${hadoop.version}</version>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.apache.hadoop</groupId>
47+
<artifactId>hadoop-yarn-common</artifactId>
48+
<version>${hadoop.version}</version>
49+
</dependency>
50+
51+
<dependency>
52+
<groupId>org.apache.hadoop</groupId>
53+
<artifactId>hadoop-yarn-client</artifactId>
54+
<version>${hadoop.version}</version>
55+
</dependency>
56+
57+
<dependency>
58+
<groupId>org.apache.hadoop</groupId>
59+
<artifactId>hadoop-mapreduce-client-core</artifactId>
60+
<version>${hadoop.version}</version>
2561
</dependency>
2662

2763
<dependency>

0 commit comments

Comments
 (0)