代码之家  ›  专栏  ›  技术社区  ›  HoTicE

AVRO模式更新出现问题

  •  1
  • HoTicE  · 技术社区  · 6 年前

    我有一个简单的案例类:

      case class User(id: String, login: String, key: String)
    

    我将添加字段“名称”

      case class User(id: String, login: String, name: String, key: String)
    

    然后在avro模式中添加此字段(user.avsc)

    {
      "namespace": "test",
      "type": "record",
      "name": "User",
      "fields": [
        { "name": "id", "type": "string" },
        { "name": "login", "type": "string" },
        { "name": "name", "type": "string" },
        { "name": "key", "type": "string" }
      ]
    }
    

    此类用于其他事例类:

    case class AuthRequest(user: User, session: String)
    

    chema(授权请求.avsc)

    {
      "namespace": "test",
      "type": "record",
      "name": "AuthRequest",
      "fields": [
        { "name": "user", "type": "User" },
        { "name": "session", "type": "string" }
      ]
    }
    

    在那次改变之后,我的消费者开始抛出例外

    Consumer.committableSource(consumerSettings, Subscriptions.topics("token_service_auth_request"))
        .map { msg =>
          Try {
            val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
            val input: AvroBinaryInputStream[AuthRequest] = AvroInputStream.binary[AuthRequest](in)
            val result: AuthRequest = input.iterator.toSeq.head !!!! here is exception
            msg.committableOffset.commitScaladsl()
    
            (msg.record.value(), result, msg.record.key())
          } match {
            case Success((a: Array[Byte], value: AuthRequest, key: String)) =>
              log.info(s"listener got $msg -> $a -> $value")
    
              context.parent ! value
    
            case Failure(e) => e.printStackTrace()
          }
        }
        .runWith(Sink.ignore)
    

    java.util.nosuchelementexception:位于的空流的头 scala.collection.immutable.stream$empty$.head(stream.scala:1104)位于 scala.collection.immutable.stream$empty$.head(stream.scala:1102)位于 test.consumers.authrequestListener.$anonfun$new$2(authrequestListener.scala:39) 在scala.util.try$.apply(try.scala:209)的 test.consumers.authrequestListener.$anonfun$new$1(authrequestListener.scala:36) 在 test.consumers.authrequestListener.$anonfun$新$1$已调整(authrequestListener.scala:35) 在akka.stream.impl.fusing.map$$anon$9.onpush(ops.scala:51)at akka.stream.impl.fusing.graphinterpreter.processpush(graphinterpreter.scala:519) 在 akka.stream.impl.fusing.graphInterpreter.processEvent(graphInterpreter.scala:482) 在 akka.stream.impl.fusing.graphinterpreter.execute(graphinterpreter.scala:378) 在 akka.stream.impl.fusing.graphinterpretershell.runbatch(actorgraphexplorer.scala:588) 在 akka.stream.impl.fusing.graphinterpretershell$asyncInput.execute(actorgraphinterpreter.scala:472) 在 akka.stream.impl.fusing.graphinterpretershell.processEvent(actorgraphexplorer.scala:563) 在 akka.stream.impl.fusing.actorgraphexplorer.akka$stream$impl$fusing$actorgraphexplorer$$processEvent(actorgraphexplorer.scala:745) 在 akka.stream.impl.fusing.actorgraphexplorer$$anonfun$接收$1.applyorelse(actorgraphexplorer.scala:760) 在Akka。演员。演员。唤醒接收(演员。scala:517)在 akka.actor.actor.aroundreceive$(actor.scala:515)在 akka.stream.impl.fusing.actorgraphexplorer.aroundreceive(actorgraphexplorer.scala:670) 在akka.actor.actorcell.receiveMessage(actorcell.scala:588)at akka.actor.actorcell.invoke(actorcell.scala:557)在 akka.dispatch.mailbox.processmailbox(mailbox.scala:258)位于 akka.dispatch.mailbox.run(mailbox.scala:225)在 akka.dispatch.mailbox.exec(mailbox.scala:235)位于 Akk.Debug。FokJoo.FokJojTask.doExc(FokJojTeask.java:260)AT 分派($:Java:1339) 在 Akk.Debug .FokCuth.FokCuangPo..RunWorkWord(FokCouthPoal.java:1979) 在 Akk.Debug .FokJoo.FrkCuleWorksTrime.Run(FokCopyWorkWorkth.java:107)

    我试图清理构建并使缓存失效-我似乎在其中的一些地方缓存了模式的早期版本。 救命啊!

    1 回复  |  直到 6 年前
        1
  •  1
  •   hlagos    6 年前

    {
      "namespace": "test",
      "type": "record",
      "name": "User",
      "fields": [
        { "name": "id", "type": "string" },
        { "name": "login", "type": "string" },
        { "name": "name", "type": ["null", "string"], "default": null },
        { "name": "key", "type": "string" }
      ]
    }