java - Spark : Best way to Broadcast KafkaProducer to Spark streaming -
to broadcast kafkaproducer spark executors have created wrapper below :
public class kafkasink implements serializable { private static kafkaproducer<string, string> producer = null; public kafkaproducer<string, string> getinstance(final properties properties) { if(producer == null) { producer = new kafkaproducer<>(properties); } return producer; } public void close() { producer.close(); } }
and using below
javasparkcontext jsc = new javasparkcontext(sc); broadcast<kafkasink> kafkasinkbroadcast = jsc.broadcast(new kafkasink())); dataset.tojavardd().foreach(row -> kafkasinkbroadcast.getvalue().getinstance(kafkaproducerprops()).send(new producerrecord<string, string>(topic, row.mkstring(", "))))
i wanted know whether right way it, or best way it
i can recommend blog post. in short, should create serializable sink each partition passing 'recipe' create kafka producer.
Comments
Post a Comment