代码之家  ›  专栏  ›  技术社区  ›  Jice

如何在google cloud dataproc集群上同时使用jupyter、pyspark和cassandra

  •  1
  • Jice  · 技术社区  · 7 年前

    我正在努力使这三个工具在谷歌云平台上协同工作。 所以我使用dataproc创建了一个带有初始化脚本的spark集群来安装cassandra和jupyter。

    当我ssh集群并启动“pyspark--packages datastax:spark cassandra connector:2.3.0-s_2.11”时 一切似乎都很好

    编辑:事实上,火花外壳没问题,但PySark没问题。

    我不知道如何用pyspark内核和cassandra连接器启动jupyter。 编辑:这个问题似乎比Jupyter更与pyspark有关。

    我试图修改kernel.json

        {
         "argv": [
            "bash",
            "-c",
            "PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS='kernel -f {connection_file}' pyspark"],
         "env": {
            "PYSPARK_SUBMIT_ARGS": "--master local[*] pyspark-shell --packages datastax:spark-cassandra-connector:2.3.0-s_2.11"
         },
         "display_name": "PySpark",
         "language": "python"
        }
    

    但这似乎行不通。在Jupyter的时候,我找不到任何关于Cassandra的信息,也有一些例外,比如:

    java.lang.ClassNotFoundException:未能找到数据源:pyspark.sql.cassandra。

    (我尝试了其他pyspark提交参数,并在pyspark驱动程序python选项中添加了--package,但没有任何效果)

    编辑:当我启动pyspark时,我有一些警告。我看不到任何与我的问题相关的信息,但我可能是错的,所以下面是PySark的起始信息:

        myuserhome@spark-cluster-m:~$ pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0
        Python 2.7.9 (default, Jun 29 2016, 13:08:31) 
        [GCC 4.9.2] on linux2
        Type "help", "copyright", "credits" or "license" for more information.
        Ivy Default Cache set to: /home/myuserhome/.ivy2/cache
        The jars for the packages stored in: /home/myuserhome/.ivy2/jars
        :: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
        com.datastax.spark#spark-cassandra-connector_2.11 added as a dependency
        :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
                confs: [default]
                found com.datastax.spark#spark-cassandra-connector_2.11;2.3.0 in central
                found com.twitter#jsr166e;1.1.0 in central
                found commons-beanutils#commons-beanutils;1.9.3 in central
                found commons-collections#commons-collections;3.2.2 in central
                found joda-time#joda-time;2.3 in central
                found org.joda#joda-convert;1.2 in central
                found io.netty#netty-all;4.0.33.Final in central
                found org.scala-lang#scala-reflect;2.11.8 in central
        :: resolution report :: resolve 2615ms :: artifacts dl 86ms
                :: modules in use:
                com.datastax.spark#spark-cassandra-connector_2.11;2.3.0 from central in [default]
                com.twitter#jsr166e;1.1.0 from central in [default]
                commons-beanutils#commons-beanutils;1.9.3 from central in [default]
                commons-collections#commons-collections;3.2.2 from central in [default]
                io.netty#netty-all;4.0.33.Final from central in [default]
                joda-time#joda-time;2.3 from central in [default]
                org.joda#joda-convert;1.2 from central in [default]
                org.scala-lang#scala-reflect;2.11.8 from central in [default]
                ---------------------------------------------------------------------
                |                  |            modules            ||   artifacts   |
                |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
                ---------------------------------------------------------------------
                |      default     |   8   |   0   |   0   |   0   ||   8   |   0   |
                ---------------------------------------------------------------------
        :: retrieving :: org.apache.spark#spark-submit-parent
                confs: [default]
                0 artifacts copied, 8 already retrieved (0kB/76ms)
        Setting default log level to "WARN".
        To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
        18/06/17 11:08:22 WARN org.apache.hadoop.hdfs.DataStreamer: Caught exception
        java.lang.InterruptedException
                at java.lang.Object.wait(Native Method)
                at java.lang.Thread.join(Thread.java:1252)
                at java.lang.Thread.join(Thread.java:1326)
                at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:973)
                at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:624)
                at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:801)
        18/06/17 11:08:23 WARN org.apache.hadoop.hdfs.DataStreamer: Caught exception
        java.lang.InterruptedException
                at java.lang.Object.wait(Native Method)
                at java.lang.Thread.join(Thread.java:1252)
                at java.lang.Thread.join(Thread.java:1326)
                at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:973)
                at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:624)
                at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:801)
        18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/com.datastax.spark_spark-cassandra-connector_2.11-2.3.0.jar added multiple times to distributed cache.
        18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/com.twitter_jsr166e-1.1.0.jar added multiple times to distributed cache.
        18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/commons-beanutils_commons-beanutils-1.9.3.jar added multiple times to distributed cache.
        18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/joda-time_joda-time-2.3.jar added multiple times to distributed cache.
        18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/org.joda_joda-convert-1.2.jar added multiple times to distributed cache.
        18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/io.netty_netty-all-4.0.33.Final.jar added multiple times to distributed cache.
        18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/org.scala-lang_scala-reflect-2.11.8.jar added multiple times to distributed cache.
        18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/commons-collections_commons-collections-3.2.2.jar added multiple times to distributed cache.
        18/06/17 11:08:24 WARN org.apache.hadoop.hdfs.DataStreamer: Caught exception
        java.lang.InterruptedException
                at java.lang.Object.wait(Native Method)
                at java.lang.Thread.join(Thread.java:1252)
                at java.lang.Thread.join(Thread.java:1326)
                at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:973)
                at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:624)
                at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:801)
        ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
        Welcome to
              ____              __
             / __/__  ___ _____/ /__
            _\ \/ _ \/ _ `/ __/  '_/
           /__ / .__/\_,_/_/ /_/\_\   version 2.2.1
              /_/
    
        Using Python version 2.7.9 (default, Jun 29 2016 13:08:31)
        SparkSession available as 'spark'.
        >>> import org.apache.spark.sql.cassandra
        Traceback (most recent call last):
          File "<stdin>", line 1, in <module>
        ImportError: No module named org.apache.spark.sql.cassandra
        >>> import pyspark.sql.cassandra
        Traceback (most recent call last):
          File "<stdin>", line 1, in <module>
        ImportError: No module named cassandra
    

    编辑 关于在PySnk中尝试导入Java包,它只是我发现的一个最简单的命令,它引起了我所面临的异常。下面是另一个:

        dfout.write.format("pyspark.sql.cassandra").mode("overwrite").option("confirm.truncate","true").option("spark.cassandra.connection.host","10.142.0.4").option("spark.cassandra.connection.port","9042").option("keyspace","uasb03").option("table","activite").save()
    
        > "An error occurred while calling o113.save.\n: java.lang.ClassNotFoundException: Failed to find data source: pyspark.sql.cassandra.
    

    我想我也试过org.apache.spark.sql.cassandra,但我必须再试一次:你的答案澄清了我盲目尝试的许多事情(master=local[*]也是一个尝试)。

    关于集群:它是按照您的建议(对于jupyter)创建的,除了--properties。Jupyter工作正常,除了我不能使用Cassandra连接器。

    编辑:根据Karthik Palaniappan的建议

    现在,当我通过ssh使用pyspark时,它可以工作。但是对于Jupyter,我仍然有一个错误:

        df=spark.read.format("csv").option("header","true").option("inferSchema","true").option("nullValue","NA").option("timestampFormat","ddMMMyyyy:HH:mm:ss").option("quote", "\"").option("delimiter", ";").option("mode","failfast").load("gs://tidy-centaur-b1/data/myfile.csv")
    
        import pyspark.sql.functions as F
    
        dfi = df.withColumn("id", F.monotonically_increasing_id()).withColumnRenamed("NUMANO", "numano")
    
        dfi.createOrReplaceTempView("pathologie")
    
        dfi.write.format("org.apache.spark.sql.cassandra").mode("overwrite").option("confirm.truncate","true").option("spark.cassandra.connection.host","10.142.0.3").option("spark.cassandra.connection.port","9042").option("keyspace","mykeyspace").option("table","mytable").save()
    
        Py4JJavaError: An error occurred while calling o115.save.
        : java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at http://spark.apache.org/third-party-projects.html
    

    我按照您的建议重新创建了集群:

        gcloud dataproc clusters create spark-cluster \
             --async \
             --project=tidy-centaur-205516 \
             --region=us-east1 \
             --zone=us-east1-b \
             --bucket=tidy-centaur-b1 \
             --image-version=1.2 \
             --num-masters=1 \
             --master-boot-disk-size=10GB \
             --master-machine-type=n1-standard-2 \
             --num-workers=2 \
             --worker-boot-disk-size=10GB \
             --worker-machine-type=n1-standard-1 \
             --metadata 'CONDA_PACKAGES="numpy pandas scipy matplotlib",PIP_PACKAGES=pandas-gbq' \
             --properties spark:spark.packages=com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 \
             --initialization-actions=gs://tidy-centaur-b1/init-cluster.sh,gs://dataproc-initialization-actions/jupyter2/jupyter2.sh
    

    init-cluster.sh安装cassandra

    我执行了jupyter notebook--generate config修改了pyspark kernel.json

        {
         "argv": [
            "bash",
            "-c",
            "PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS='kernel -f {connection_file}' pyspark"],
         "env": {
            "PYSPARK_SUBMIT_ARGS": "pyspark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0"
         },
         "display_name": "PySpark",
         "language": "python"
        }
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   Karthik Palaniappan    7 年前

    spark-cassandra connector's docs spark.read.format("org.apache.spark.sql.cassandra")...

    请使用 Jupyter Jupyter2 (python 2+pip)正确安装jupyter+pyspark的初始化操作。重要的是,您不想使用 --master=local[*]

    此外,还有 --packages spark.packages set spark properties 使用创建群集时 --properties spark:spark.jars.packages=<package>

    所以我认为你想要这样的东西:

    gcloud dataproc clusters create <cluster-name> \
        --initialization-actions gs://dataproc-initialization-actions/jupyter/jupyter.sh
        --properties spark:spark.jars.packages=datastax:spark-cassandra-connector:2.3.0-s_2.11
    

     spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="kv", keyspace="test") \
        .load().show()