pojo代碼
package com.test.pojo;
public class WordCount {
public String word;
public int count;
public WordCount(String word, int count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
package com.test;
import com.test.pojp.WordCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
@SuppressWarnings("serial")
public class Flink01 {
public static void main(String[] args) {
final String hostname;
final int port;
System.out.println("-----------aaaaaaaaaaaaaaaaaaa--");
try {
final ParameterTool parms = ParameterTool.fromArgs(args);
hostname=parms.has("hostname")?parms.get("hostname"):"localhost";
port=parms.getInt("port");
System.out.println(port+"-------------"+hostname);
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount " +
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
"type the input text into the command line");
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> str=env.socketTextStream(hostname,port);
DataStream<WordCount> wordcounts=str.flatMap(new FlatMapFunction<String, WordCount>() {
public void flatMap(String s, Collector<WordCount> collector) throws Exception {
for(String word:s.split("\\s")){
collector.collect(new WordCount(word,1));
}
}
}).keyBy("word").timeWindow(Time.seconds(5)).reduce(new ReduceFunction<WordCount>() {
public WordCount reduce(WordCount wordCount, WordCount t1) throws Exception {
return new WordCount(wordCount.word,wordCount.count+t1.count);
}
});
wordcounts.print().setParallelism(1);
try {
env.execute("socket wordcount window");
} catch (Exception e) {
e.printStackTrace();
}
}
}
pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
異常log_1
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
at org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:643)
at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:206)
at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:140)
at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:799)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:196)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
解決方法如下戴甩,打包方式要調(diào)整,去掉圖中標(biāo)黃部分
異常記錄2
The program finished with the following exception:
This type (GenericType<com.test.pojp.WordCount>) cannot be used as key.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:337)
com.test.Flink01.main(Flink01.java:40)
上面的error原因是pojo對(duì)象沒(méi)有添加無(wú)參構(gòu)造
添加這行代碼重新編包即可
package com.test.pojp;
public class WordCount {
public String word;
public int count;
public WordCount(String word, int count) {
this.word = word;
this.count = count;
}
public WordCount(){};//就是這行
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}