代码之家  ›  专栏  ›  技术社区  ›  Manish Saraf Bhardwaj

Spark结构化流式Elasticsearch集成问题

  •  0
  • Manish Saraf Bhardwaj  · 技术社区  · 6 年前

    我正在写一个Spark结构化流式应用程序,其中使用Spark处理的数据需要接收到弹性搜索。 Spark版本2.3.0.cloudera4

    我用spark shell

    spark2-shell --jars /tmp/elasticsearch-hadoop-2.3.2/dist/elasticsearch-hadoop-2.3.2.jar
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, TimestampType};
    import java.util.Calendar
    import org.apache.spark.sql.SparkSession
    import org.elasticsearch.spark.sql
    import sys.process._
    
    val checkPointDir =  "/tmp/rt/checkpoint/"
    
    val spark = SparkSession.builder
    .config("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    .config("fs.s3n.awsAccessKeyId","aaabbb")
    .config("fs.s3n.awsSecretAccessKey","aaabbbccc")
    .config("spark.sql.streaming.checkpointLocation",s"$checkPointDir")
    .config("es.index.auto.create", "true").getOrCreate()
    import spark.implicits._
    
    val requestSchema = new StructType().add("log_type", StringType).add("time_stamp", StringType).add("host_name", StringType).add("data_center", StringType).add("build", StringType).add("ip_trace", StringType).add("client_ip", StringType).add("protocol", StringType).add("latency", StringType).add("status", StringType).add("response_size", StringType).add("request_id", StringType).add("user_id", StringType).add("pageview_id", StringType).add("impression_id", StringType).add("source_impression_id", StringType).add("rnd", StringType).add("publisher_id", StringType).add("site_id", StringType).add("zone_id", StringType).add("slot_id", StringType).add("tile", StringType).add("content_id", StringType).add("post_id", StringType).add("postgroup_id", StringType).add("brand_id", StringType).add("provider_id", StringType).add("geo_country", StringType).add("geo_region", StringType).add("geo_city", StringType).add("geo_zip_code", StringType).add("geo_area_code", StringType).add("geo_dma_code", StringType).add("browser_group", StringType).add("page_url", StringType).add("document_referer", StringType).add("user_agent", StringType).add("cookies", StringType).add("kvs", StringType).add("notes", StringType).add("request", StringType)
    val requestDF = spark.readStream.option("delimiter", "\t").format("com.databricks.spark.csv").schema(requestSchema).load("s3n://aa/logs/cc.com/r/year=" + Calendar.getInstance().get(Calendar.YEAR) + "/month=" + "%02d".format(Calendar.getInstance().get(Calendar.MONTH)+1) + "/day=" + "%02d".format(Calendar.getInstance().get(Calendar.DAY_OF_MONTH)) + "/hour=" + "%02d".format(Calendar.getInstance().get(Calendar.HOUR_OF_DAY)) + "/*.log")
    requestDF.writeStream.format("org.elasticsearch.spark.sql").option("es.resource", "rt_request/doc").option("es.nodes", "localhost").outputMode("Append").start()
    

    我尝试了以下两种方法将数据集中到ES。 1ds.writeStream公司()。格式(“org.elasticsearch.spark网站.sql“.start(“rt_请求/doc”); 2ds.writeStream公司().格式(“es”).start(“rt_请求/doc”); 在这两种情况下,我都会得到以下错误:

    原因: java.lang.UnsupportedOperationException:数据源es不支持流式写入

    java.lang.UnsupportedOperationException:数据源org.elasticsearch.spark网站.sql不支持流式写入 在org.apache.spark网站.sql.streaming.DataStreamWriter.启动(DataStreamWriter.scala:293)

    0 回复  |  直到 6 年前
        1
  •  0
  •   Manish Saraf Bhardwaj    6 年前

    我使用的eshadoopjar版本是旧的elasticsearch-hadoop-2.3.2.jar。我们需要6个或以上。

    我已经从 https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-7.1.1.zip