我有一个简单的案例类:
case class User(id: String, login: String, key: String)
我将添加字段“名称”
case class User(id: String, login: String, name: String, key: String)
然后在avro模式中添加此字段(user.avsc)
{
"namespace": "test",
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "login", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "key", "type": "string" }
]
}
此类用于其他事例类:
case class AuthRequest(user: User, session: String)
chema(授权请求.avsc)
{
"namespace": "test",
"type": "record",
"name": "AuthRequest",
"fields": [
{ "name": "user", "type": "User" },
{ "name": "session", "type": "string" }
]
}
在那次改变之后,我的消费者开始抛出例外
Consumer.committableSource(consumerSettings, Subscriptions.topics("token_service_auth_request"))
.map { msg =>
Try {
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[AuthRequest] = AvroInputStream.binary[AuthRequest](in)
val result: AuthRequest = input.iterator.toSeq.head !!!! here is exception
msg.committableOffset.commitScaladsl()
(msg.record.value(), result, msg.record.key())
} match {
case Success((a: Array[Byte], value: AuthRequest, key: String)) =>
log.info(s"listener got $msg -> $a -> $value")
context.parent ! value
case Failure(e) => e.printStackTrace()
}
}
.runWith(Sink.ignore)
java.util.nosuchelementexception:位于的空流的头
scala.collection.immutable.stream$empty$.head(stream.scala:1104)位于
scala.collection.immutable.stream$empty$.head(stream.scala:1102)位于
test.consumers.authrequestListener.$anonfun$new$2(authrequestListener.scala:39)
在scala.util.try$.apply(try.scala:209)的
test.consumers.authrequestListener.$anonfun$new$1(authrequestListener.scala:36)
在
test.consumers.authrequestListener.$anonfun$新$1$已调整(authrequestListener.scala:35)
在akka.stream.impl.fusing.map$$anon$9.onpush(ops.scala:51)at
akka.stream.impl.fusing.graphinterpreter.processpush(graphinterpreter.scala:519)
在
akka.stream.impl.fusing.graphInterpreter.processEvent(graphInterpreter.scala:482)
在
akka.stream.impl.fusing.graphinterpreter.execute(graphinterpreter.scala:378)
在
akka.stream.impl.fusing.graphinterpretershell.runbatch(actorgraphexplorer.scala:588)
在
akka.stream.impl.fusing.graphinterpretershell$asyncInput.execute(actorgraphinterpreter.scala:472)
在
akka.stream.impl.fusing.graphinterpretershell.processEvent(actorgraphexplorer.scala:563)
在
akka.stream.impl.fusing.actorgraphexplorer.akka$stream$impl$fusing$actorgraphexplorer$$processEvent(actorgraphexplorer.scala:745)
在
akka.stream.impl.fusing.actorgraphexplorer$$anonfun$接收$1.applyorelse(actorgraphexplorer.scala:760)
在Akka。演员。演员。唤醒接收(演员。scala:517)在
akka.actor.actor.aroundreceive$(actor.scala:515)在
akka.stream.impl.fusing.actorgraphexplorer.aroundreceive(actorgraphexplorer.scala:670)
在akka.actor.actorcell.receiveMessage(actorcell.scala:588)at
akka.actor.actorcell.invoke(actorcell.scala:557)在
akka.dispatch.mailbox.processmailbox(mailbox.scala:258)位于
akka.dispatch.mailbox.run(mailbox.scala:225)在
akka.dispatch.mailbox.exec(mailbox.scala:235)位于
Akk.Debug。FokJoo.FokJojTask.doExc(FokJojTeask.java:260)AT
分派($:Java:1339)
在
Akk.Debug .FokCuth.FokCuangPo..RunWorkWord(FokCouthPoal.java:1979)
在
Akk.Debug .FokJoo.FrkCuleWorksTrime.Run(FokCopyWorkWorkth.java:107)
我试图清理构建并使缓存失效-我似乎在其中的一些地方缓存了模式的早期版本。
救命啊!