scala - Spark: java.io.NotSerializableException -


i want pass path function saveastextfile runs in spark streaming. however, java.io.notserializableexception. in similar cases use skeleton object, in particular case don't know how solve issue. can me please?

import java.util import java.util.properties import com.fasterxml.jackson.databind.{deserializationfeature, objectmapper} import com.fasterxml.jackson.module.scala.defaultscalamodule import com.fasterxml.jackson.module.scala.experimental.scalaobjectmapper import com.lambdaworks.jacks.jacksmapper import org.sedis._ import redis.clients.jedis._ import com.typesafe.config.configfactory import kafka.consumer.{consumer, consumerconfig} import kafka.utils.logging import org.apache.log4j.{level, logger} import org.apache.spark.sparkconf import org.apache.spark.streaming.kafka.kafkautils import org.apache.spark.streaming.{seconds, streamingcontext}  class kafkatestconsumer(val zkquorum: string,                         val group: string,                         val topicmessages: string,                         val path: string) extends logging {  // ... // dstream[string] dstream.foreachrdd { rdd =>    // rdd -> rdd[string], each string json    // parsing each json    // splitted -> rdd[map[string,any]]    val splitted = rdd.map(line => utils.parsejson(line))     // ...    splitted.saveastextfile(path) }  }  object utils {    def parsejson[t](json: string): map[string,any] = {     val mapper = new objectmapper() scalaobjectmapper     mapper.registermodule(defaultscalamodule)     mapper.configure(deserializationfeature.fail_on_unknown_properties, false)     mapper.readvalue[map[string,any]](json)   } } 

the whole stacktrace:

16/09/22 17:03:28 error utils: exception encountered java.io.notserializableexception: org.consumer.kafka.kafkatestconsumer @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1184) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1509) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1509) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1509) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.writearray(objectoutputstream.java:1378) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1174) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1509) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.defaultwriteobject(objectoutputstream.java:441) @ org.apache.spark.streaming.dstreamgraph$$anonfun$writeobject$1.apply$mcv$sp(dstreamgraph.scala:180) @ org.apache.spark.streaming.dstreamgraph$$anonfun$writeobject$1.apply(dstreamgraph.scala:175) @ org.apache.spark.streaming.dstreamgraph$$anonfun$writeobject$1.apply(dstreamgraph.scala:175) @ org.apache.spark.util.utils$.tryorioexception(utils.scala:1205) @ org.apache.spark.streaming.dstreamgraph.writeobject(dstreamgraph.scala:175) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ java.io.objectstreamclass.invokewriteobject(objectstreamclass.java:1028) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1496) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.writeobject(objectoutputstream.java:348) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializablewithwriteobjectmethod(serializationdebugger.scala:230) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:189) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:108) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:206) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:108) @ org.apache.spark.serializer.serializationdebugger$.find(serializationdebugger.scala:67) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:41) @ org.apache.spark.streaming.streamingcontext.validate(streamingcontext.scala:560) @ org.apache.spark.streaming.streamingcontext.liftedtree1$1(streamingcontext.scala:601) @ org.apache.spark.streaming.streamingcontext.start(streamingcontext.scala:600) @ org.consumer.kafka.kafkadecisionsconsumer.run(kafkatestconsumer.scala:136) @ org.consumer.servicerunner$.main(queuingservicerunner.scala:20) @ org.consumer.servicerunner.main(queuingservicerunner.scala)

the problem using rdd action saveastext file inside dstream action foreach running on workers thats why giving serializable error example when running above code worker trying execute splitted.saveastextfile(path) rdd action thats why give serialization error can below

dstream.foreachrdd { rdd =>    // rdd -> rdd[string], each string json    // parsing each json    // splitted -> rdd[map[string,any]]    val splitted = rdd.map(line => utils.parsejson(line))     // ... }.saveastextfile(path) 

Comments

Popular posts from this blog

unity3d - Rotate an object to face an opposite direction -

angular - Is it possible to get native element for formControl? -

javascript - Why jQuery Select box change event is now working? -