執(zhí)行環(huán)境
創(chuàng)建一個執(zhí)行環(huán)境陆盘,表示當(dāng)前執(zhí)行程序的上下文,如果程序是獨立調(diào)用的則此方法返回本地執(zhí)行環(huán)境败明,如果從命令行客戶端調(diào)用程序提交到集群隘马,則此方法返回此集群的執(zhí)行環(huán)境,也就是是說妻顶,getExecutionEnvironment(),會根據(jù)查詢運(yùn)行的方式?jīng)Q定返回什么樣的運(yùn)行環(huán)境祟霍。是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。
# 流處理環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
#批處理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
1.讀取的數(shù)據(jù)源-Elements
DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 2, 5, 44);
2.讀取的數(shù)據(jù)源-Collection
public class SensorReading {
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
DataStreamSource<SensorReading> sensorDataStream = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.8),
new SensorReading("sensor_7", 1547718202L, 37.8),
new SensorReading("sensor_10", 1547718203L, 38.8)
));
3.讀取的數(shù)據(jù)源-File
//從文件讀取
DataStreamSource<String> stringDataStreamSource = env.readTextFile("/path/to/hello.txt");
4.從socket文本流讀取數(shù)據(jù)
DataStream<String> inputDataStream = env.socketTextStream(localhost,7777);
最終執(zhí)行
sensorDataStream.print("sensor");
integerDataStreamSource.print("int");
stringDataStreamSource.print("file");
env.execute("my job name");