代码之家  ›  专栏  ›  技术社区  ›  Glen

Apache Spark Streaming集成测试失败

  •  15
  • Glen  · 技术社区  · 9 年前

    我一直在努力追踪我为ApacheSpark项目编写的一些单元/集成测试的问题。

    使用Spark 1.1.1时,我的测试通过了。当我尝试升级到1.4.0(也尝试了1.4.1)时,测试开始失败。

    我已经设法减少了将问题再现到下面的小型集成测试所需的代码。

    有趣的是,如果我在测试中注释掉@RunWith注释,那么测试就会正确通过。显然,我不需要@RunWith注释用于这个缩减测试,但真正的测试相当广泛地使用了mock,所以我不必放弃使用PowerMock。

    package com.example;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.powermock.modules.junit4.PowerMockRunner;
    
    @RunWith(PowerMockRunner.class)
    public class SampleTest {
    
        @Before
        public void setup() throws Exception {
            SparkConf conf = new     SparkConf(false).setMaster("local[2]").setAppName("My app");
            JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(1000));
        }
    
        @Test
        public void exampleTest() {
        }
    }
    

    以下是我看到的例外

    java.io.IOException: failure to login
        at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:796)
        at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
        at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
        at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
        at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:301)
        at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:842)
        at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:80)
        at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>(JavaStreamingContext.scala:133)
        at com.example.SampleTest.setup(SampleTest.java:19)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.junit.internal.runners.MethodRoadie.runBefores(MethodRoadie.java:133)
        at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:96)
        at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294)
        at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127)
        at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82)
        at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282)
        at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87)
        at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50)
        at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207)
        at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146)
        at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:120)
        at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34)
        at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44)
        at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:122)
        at org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:106)
        at org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53)
        at org.powermock.modules.junit4.PowerMockRunner.run(PowerMockRunner.java:59)
        at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
        at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
    Caused by: javax.security.auth.login.LoginException: Can't find user name
        at org.apache.hadoop.security.UserGroupInformation$HadoopLoginModule.commit(UserGroupInformation.java:197)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:784)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
        at javax.security.auth.login.LoginContext$5.run(LoginContext.java:721)
        at javax.security.auth.login.LoginContext$5.run(LoginContext.java:719)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokeCreatorPriv(LoginContext.java:718)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:591)
        at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:771)
        ... 38 more
    

    各种依赖项的版本如下所示

    • hadoop客户端2.6
    • apache spark 1.4.0/1.4.1
    • 少年4.12
    • 简单模拟3.31
    • 功率模拟1.6.2

    我已经在各种版本的Spark中尝试过。以上测试通过以下版本的Spark

    • 1.1.1
    • 1.2.2

    它从Spark 1.3.0开始失败。

    有什么想法吗?我需要改变什么才能让它发挥作用?

    1 回复  |  直到 9 年前
        1
  •  12
  •   mattinbits    9 年前

    查看SparkContext的源代码,在尝试获取当前用户名时导致异常的行。在里面 version 1.2 存在回退默认值 SparkContext.SPARK_UNKNOWN_USER 并且不需要当前登录的用户:

     // Set SPARK_USER for user who is running SparkContext.
     val sparkUser = Option {
         Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))
      }.getOrElse {
          SparkContext.SPARK_UNKNOWN_USER
      }
    

    此代码在中引入 version 1.3 不再具有默认用户,因此为什么在早期版本中不会出现此错误:

    // Set SPARK_USER for user who is running SparkContext.
    val sparkUser = Utils.getCurrentUserName()
    

    这将调用 following code 在里面 Utils :

    /**
       * Returns the current user name. This is the currently logged in user, unless that's been
       * overridden by the `SPARK_USER` environment variable.
       */
      def getCurrentUserName(): String = {
        Option(System.getenv("SPARK_USER"))
          .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
      }
    

    如果设置环境变量 SPARK_USER ,您应该能够防止分支到UserGroupInformation,这会导致异常。

    UserGroupInformation 是Hadoop安全类,它看起来像 PowerMock 使其无法正常工作。