java - Spark : Best way to Broadcast KafkaProducer to Spark streaming -


to broadcast kafkaproducer spark executors have created wrapper below :

public class kafkasink implements serializable {     private static kafkaproducer<string, string> producer = null;      public kafkaproducer<string, string> getinstance(final properties properties) {         if(producer == null) {             producer = new kafkaproducer<>(properties);         }         return producer;     }      public void close() {         producer.close();     } } 

and using below

 javasparkcontext jsc = new javasparkcontext(sc);  broadcast<kafkasink> kafkasinkbroadcast = jsc.broadcast(new kafkasink()));  dataset.tojavardd().foreach(row -> kafkasinkbroadcast.getvalue().getinstance(kafkaproducerprops()).send(new producerrecord<string, string>(topic, row.mkstring(", ")))) 

i wanted know whether right way it, or best way it

i can recommend blog post. in short, should create serializable sink each partition passing 'recipe' create kafka producer.


Comments

Popular posts from this blog

angular - Is it possible to get native element for formControl? -

unity3d - Rotate an object to face an opposite direction -

javascript - Why jQuery Select box change event is now working? -