scala - Spark: java.io.NotSerializableException -
i want pass path
function saveastextfile
runs in spark streaming. however, java.io.notserializableexception
. in similar cases use skeleton object, in particular case don't know how solve issue. can me please?
import java.util import java.util.properties import com.fasterxml.jackson.databind.{deserializationfeature, objectmapper} import com.fasterxml.jackson.module.scala.defaultscalamodule import com.fasterxml.jackson.module.scala.experimental.scalaobjectmapper import com.lambdaworks.jacks.jacksmapper import org.sedis._ import redis.clients.jedis._ import com.typesafe.config.configfactory import kafka.consumer.{consumer, consumerconfig} import kafka.utils.logging import org.apache.log4j.{level, logger} import org.apache.spark.sparkconf import org.apache.spark.streaming.kafka.kafkautils import org.apache.spark.streaming.{seconds, streamingcontext} class kafkatestconsumer(val zkquorum: string, val group: string, val topicmessages: string, val path: string) extends logging { // ... // dstream[string] dstream.foreachrdd { rdd => // rdd -> rdd[string], each string json // parsing each json // splitted -> rdd[map[string,any]] val splitted = rdd.map(line => utils.parsejson(line)) // ... splitted.saveastextfile(path) } } object utils { def parsejson[t](json: string): map[string,any] = { val mapper = new objectmapper() scalaobjectmapper mapper.registermodule(defaultscalamodule) mapper.configure(deserializationfeature.fail_on_unknown_properties, false) mapper.readvalue[map[string,any]](json) } }
the whole stacktrace:
16/09/22 17:03:28 error utils: exception encountered java.io.notserializableexception: org.consumer.kafka.kafkatestconsumer @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1184) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1509) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1509) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1509) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.writearray(objectoutputstream.java:1378) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1174) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1509) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.defaultwriteobject(objectoutputstream.java:441) @ org.apache.spark.streaming.dstreamgraph$$anonfun$writeobject$1.apply$mcv$sp(dstreamgraph.scala:180) @ org.apache.spark.streaming.dstreamgraph$$anonfun$writeobject$1.apply(dstreamgraph.scala:175) @ org.apache.spark.streaming.dstreamgraph$$anonfun$writeobject$1.apply(dstreamgraph.scala:175) @ org.apache.spark.util.utils$.tryorioexception(utils.scala:1205) @ org.apache.spark.streaming.dstreamgraph.writeobject(dstreamgraph.scala:175) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ java.io.objectstreamclass.invokewriteobject(objectstreamclass.java:1028) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1496) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.writeobject(objectoutputstream.java:348) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializablewithwriteobjectmethod(serializationdebugger.scala:230) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:189) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:108) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:206) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:108) @ org.apache.spark.serializer.serializationdebugger$.find(serializationdebugger.scala:67) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:41) @ org.apache.spark.streaming.streamingcontext.validate(streamingcontext.scala:560) @ org.apache.spark.streaming.streamingcontext.liftedtree1$1(streamingcontext.scala:601) @ org.apache.spark.streaming.streamingcontext.start(streamingcontext.scala:600) @ org.consumer.kafka.kafkadecisionsconsumer.run(kafkatestconsumer.scala:136) @ org.consumer.servicerunner$.main(queuingservicerunner.scala:20) @ org.consumer.servicerunner.main(queuingservicerunner.scala)
the problem using rdd action saveastext file inside dstream action foreach running on workers thats why giving serializable error example when running above code worker trying execute splitted.saveastextfile(path) rdd action thats why give serialization error can below
dstream.foreachrdd { rdd => // rdd -> rdd[string], each string json // parsing each json // splitted -> rdd[map[string,any]] val splitted = rdd.map(line => utils.parsejson(line)) // ... }.saveastextfile(path)
Comments
Post a Comment