代码之家  ›  专栏  ›  技术社区  ›  Altin Mag.

如何将spark连接到mongodb?

  •  0
  • Altin Mag.  · 技术社区  · 2 年前

    当我尝试加载数据时,我会收到以下消息

    Py4JJava错误:调用o172.load时出错。示例:java.lang.ClassNotFoundException:未能找到数据源:com.mongodb.spark.sql.DefaultSource。请在 https://spark.apache.org/third-party-projects.html 我正在写jupyternotebook

    这个错误发生在DefauldSource,它的包有一些问题,但我想我安装了所有的包

    这是我装货的地方

     df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load() 
     #df <- hier beginns my trouble idk why
    

    我安装了pyspark和pymongo

    #!pip安装pyspark pymongo

    已经满足的要求:/usr/local/spark-3.3.2-bin-hadoop3/python中的pyspark(3.3.2)已经满足的需求:py4j==0.10.9.5 in/opt/conda/lib/python3.10/site-packages(来自pyspark)(0.10.9.5)已经满足需求:pymongo in/opt/cona/lib/pyton3.10/site-package(4.3.3)已经满足要求:dnspython<3.0.0,>=1.16.0 in/opt/conda/lib/python3.10/site-packages(来自pymongo)(2.3.0)

    这是我的密码。我正在尝试选择列“AGE”、“W”、“L”、“DD2”、“TD3”、“PLUS_MINUS”。

    
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    
     spark = SparkSession.builder \
        .appName("MongoDB Spark Connector Example") \
        .config("spark.mongodb.input.uri", "mongodb:xxxx:<PORT>/<DB>.<COLLECTION>") \
        .getOrCreate()
    
     season_year = "1996"
     team_abbreviation = "CHH"
    
     df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load() 
     #df <- hier beginns my trouble idk why
    
     filtered_df = df.filter((col("season") == season_year) & (col("TEAM_ABBREVIATION") ==   team_abbreviation))
    
     selected_columns = ["AGE", "W", "L", "DD2", "TD3", "PLUS_MINUS"]
     selected_df = filtered_df.select(*selected_columns)
    
    
     selected_df.show()
    
    
    
    1 回复  |  直到 2 年前
        1
  •  1
  •   Freeman    2 年前

    据我所知,你可能丢了包裹 mongo-spark-connector ,所以首先试着这样安装:

    !pip install --upgrade pymongo[srv],pyspark,pyarrow
    

    并确保您已添加 mongo火花连接器 震击你的火花 classpath ,然后将其添加到Spark会话配置中,并替换 3.0.1 使用已安装的连接器版本:

    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")
    

    使现代化
    所以试着降低 mongo火花连接器 版本转换为较低版本,看看它是否有效?您也可以尝试在代码中显式指定包版本,如下所示(更改 xxxx:/. 用你的 MongoDB 连接字符串):

    df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("spark.mongodb.input.uri", "mongodb:xxxx:<PORT>/<DB>.<COLLECTION>") \
        .load()
    

    你确定你已经正确地将连接器jar文件添加到类路径中了吗?请这样检查

    echo $SPARK_CLASSPATH
    

    祝你好运