Skip to content

Commit 1a6f1a5

Browse files
authored
Merge pull request DTStack#22 from DocLi/v1.6.0
V1.6.0
2 parents 31cceda + b72adbd commit 1a6f1a5

File tree

8 files changed

+765
-92
lines changed

8 files changed

+765
-92
lines changed

core/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,18 @@
5656
<artifactId>flink-streaming-scala_2.11</artifactId>
5757
<version>${flink.version}</version>
5858
</dependency>
59+
60+
<dependency>
61+
<groupId>org.apache.flink</groupId>
62+
<artifactId>flink-shaded-hadoop2</artifactId>
63+
<version>${flink.version}</version>
64+
</dependency>
65+
66+
<dependency>
67+
<groupId>org.apache.flink</groupId>
68+
<artifactId>flink-yarn_2.11</artifactId>
69+
<version>${flink.version}</version>
70+
</dependency>
5971
</dependencies>
6072

6173
<build>

core/src/main/java/com/dtstack/flink/sql/MyLocalStreamEnvironment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
100100
Configuration configuration = new Configuration();
101101
configuration.addAll(jobGraph.getJobConfiguration());
102102

103-
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
103+
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "-1L");
104104
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
105105

106106
// add (and override) the settings with what the user defined
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright (C) 2018 The Sylph Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.dtstack.flink.yarn;
17+
18+
import java.util.Objects;
19+
import java.util.Properties;
20+
21+
public class JobParameter
22+
{
23+
private int parallelism = 1;
24+
private String queue = "default";
25+
private int taskManagerMemoryMb = 1024;
26+
private int taskManagerCount = 1;
27+
private int taskManagerSlots = 1;
28+
private int jobManagerMemoryMb = 1024;
29+
30+
public JobParameter() {}
31+
32+
public JobParameter(Properties confProperties) {
33+
this.parallelism = confProperties.getProperty("parallelism")==null?parallelism:Integer.parseInt(confProperties.getProperty("parallelism"));
34+
this.queue = confProperties.getProperty("queue")==null?queue:confProperties.getProperty("queue");
35+
this.taskManagerMemoryMb = confProperties.getProperty("taskManagerMemoryMb")==null?taskManagerMemoryMb:Integer.parseInt(confProperties.getProperty("taskManagerMemoryMb"));
36+
this.taskManagerCount = confProperties.getProperty("taskManagerCount")==null?taskManagerCount:Integer.parseInt(confProperties.getProperty("taskManagerCount"));
37+
this.taskManagerSlots = confProperties.getProperty("taskManagerSlots")==null?taskManagerSlots:Integer.parseInt(confProperties.getProperty("taskManagerSlots"));
38+
this.jobManagerMemoryMb = confProperties.getProperty("jobManagerMemoryMb")==null?jobManagerMemoryMb:Integer.parseInt(confProperties.getProperty("jobManagerMemoryMb"));
39+
}
40+
41+
public JobParameter(int parallelism, String queue, int taskManagerMemoryMb, int taskManagerCount, int taskManagerSlots, int jobManagerMemoryMb) {
42+
this.parallelism = parallelism;
43+
this.queue = queue;
44+
this.taskManagerMemoryMb = taskManagerMemoryMb;
45+
this.taskManagerCount = taskManagerCount;
46+
this.taskManagerSlots = taskManagerSlots;
47+
this.jobManagerMemoryMb = jobManagerMemoryMb;
48+
}
49+
50+
public void setQueue(String queue)
51+
{
52+
this.queue = queue;
53+
}
54+
55+
public void setTaskManagerCount(int taskManagerCount)
56+
{
57+
this.taskManagerCount = taskManagerCount;
58+
}
59+
60+
public void setTaskManagerMemoryMb(int taskManagerMemoryMb)
61+
{
62+
this.taskManagerMemoryMb = taskManagerMemoryMb;
63+
}
64+
65+
public void setTaskManagerSlots(int taskManagerSlots)
66+
{
67+
this.taskManagerSlots = taskManagerSlots;
68+
}
69+
70+
public void setJobManagerMemoryMb(int jobManagerMemoryMb)
71+
{
72+
this.jobManagerMemoryMb = jobManagerMemoryMb;
73+
}
74+
75+
public void setParallelism(int parallelism)
76+
{
77+
this.parallelism = parallelism;
78+
}
79+
80+
public int getParallelism()
81+
{
82+
return parallelism;
83+
}
84+
85+
public String getQueue()
86+
{
87+
return queue;
88+
}
89+
90+
public int getJobManagerMemoryMb()
91+
{
92+
return jobManagerMemoryMb;
93+
}
94+
95+
public int getTaskManagerSlots()
96+
{
97+
return taskManagerSlots;
98+
}
99+
100+
public int getTaskManagerCount()
101+
{
102+
return taskManagerCount;
103+
}
104+
105+
public int getTaskManagerMemoryMb()
106+
{
107+
return taskManagerMemoryMb;
108+
}
109+
110+
@Override
111+
public boolean equals(Object o)
112+
{
113+
if (this == o) {
114+
return true;
115+
}
116+
if (o == null || getClass() != o.getClass()) {
117+
return false;
118+
}
119+
JobParameter jobParameter = (JobParameter) o;
120+
return Objects.equals(this.queue, jobParameter.queue) &&
121+
Objects.equals(this.taskManagerCount, jobParameter.taskManagerCount) &&
122+
Objects.equals(this.taskManagerMemoryMb, jobParameter.taskManagerMemoryMb);
123+
}
124+
125+
@Override
126+
public int hashCode()
127+
{
128+
return Objects.hash(queue, taskManagerMemoryMb, taskManagerCount);
129+
}
130+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (C) 2018 The Sylph Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.dtstack.flink.yarn;
17+
18+
import org.apache.flink.configuration.Configuration;
19+
import org.apache.hadoop.fs.Path;
20+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
21+
22+
import java.util.Set;
23+
24+
public class YarnClusterConfiguration {
25+
/**
26+
* The configuration used by YARN (i.e., <pre>yarn-site.xml</pre>).
27+
*/
28+
private final YarnConfiguration yarnConf;
29+
30+
/**
31+
* The home directory of all job where all the temporary files for each jobs are stored.
32+
*/
33+
private final String appRootDir;
34+
35+
/**
36+
* The location of the Flink jar.
37+
*/
38+
private final Path flinkJar;
39+
40+
/**
41+
* Additional resources to be localized for both JobManager and TaskManager.
42+
* They will NOT be added into the classpaths.
43+
*/
44+
private final Set<Path> resourcesToLocalize;
45+
46+
/**
47+
* flink conf
48+
*/
49+
private final Configuration flinkConfiguration;
50+
51+
public YarnClusterConfiguration(
52+
Configuration flinkConf,
53+
YarnConfiguration conf,
54+
String appRootDir,
55+
Path flinkJar,
56+
Set<Path> resourcesToLocalize) {
57+
this.flinkConfiguration = flinkConf;
58+
this.yarnConf = conf;
59+
this.appRootDir = appRootDir;
60+
this.flinkJar = flinkJar;
61+
this.resourcesToLocalize = resourcesToLocalize;
62+
}
63+
64+
YarnConfiguration yarnConf() {
65+
return yarnConf;
66+
}
67+
68+
public String appRootDir() {
69+
return appRootDir;
70+
}
71+
72+
public Configuration flinkConfiguration() {
73+
return flinkConfiguration;
74+
}
75+
76+
public Path flinkJar() {
77+
return flinkJar;
78+
}
79+
80+
public Set<Path> resourcesToLocalize() {
81+
return resourcesToLocalize;
82+
}
83+
84+
}

0 commit comments

Comments
 (0)