我正在尝试将我的flink工作部署到AWS EMR上(版本5.15和flink 1.4.2)但是,我无法从流中获取任何输出。
我试着创造一个简单的工作:
object StreamingJob1 {
def main(args: Array[String]) {
val path = args(0)
val file_input_format = new TextInputFormat(
new org.apache.flink.core.fs.Path(path))
file_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
file_input_format.setNestedFileEnumeration(true)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val myStream: DataStream[String] =
env.readFile(file_input_format,
path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000L)
.map(s => s.split(",").toString)
myStream.print()
// execute program
env.execute("Flink Streaming Scala")
}
}
我用下面的命令执行了它:
hadoop_conf_dir=/etc/hadoop/conf;flink run-m yarn cluster-yn 4-c my.pkg.streamingjob1/home/hadoop/flink-test-0.1.jar hdfs:///user/hadoop/data/
没有错误,但屏幕上除了弗林克的信息日志没有输出。
我试图输出到一个动觉流,或一个s3文件。没有任何记录。
myStream.addSink(new BucketingSink[String](output_path))
我还试图写入HDFS文件在本例中,创建了一个文件,但大小为0。
我确信已使用简单的检查处理了输入文件:
myStream.map(s => {"abc".toInt})
产生了一个异常。
我错过了什么?