java - Sending Data to Kafka Producer -
i trying read 100k file , send kafka topic. here kafka code sends data kafka-console-consumer. when sending data receiving data
java.util.stream.referencepipeline$head@e9e54c2
here sample single record data sending:
173|172686|548247079|837113012|0x548247079f|7|173|172686a|0|173|2059 22143|0|173|1|173|172686|||0|||7|0||7|||7|172686|allowallservices|?20161231:22143|548247079||0|173||172686|5:2266490827:dccinter;20160905152146;2784
any suggestion data had showned in above...thanks
code:
import java.io.bufferedreader; import java.io.filenotfoundexception; import java.io.filereader; import java.io.ioexception; import java.nio.file.files; import java.nio.file.paths; import java.util.properties; import java.util.properties; import java.util.concurrent.executionexception; import java.util.stream.stream; import kafka.javaapi.producer.producer; import kafka.producer.keyedmessage; import kafka.producer.producerconfig; @suppresswarnings("unused") public class hundredkrecords { private static string scurrentline; public static void main(string args[]) throws interruptedexception, executionexception{ string filename = "/users/sreeeedupuganti/downloads/octfwriter.txt"; //read file stream, try-with-resources try (stream<string> stream = files.lines(paths.get(filename))) { stream.foreach(system.out::println); kafka(stream.tostring()); } catch (ioexception e) { e.printstacktrace(); } } public static void kafka(string stream) { properties props = new properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class", "kafka.serializer.stringencoder"); props.put("partitioner.class","kafka.producer.defaultpartitioner"); props.put("request.required.acks", "1"); producerconfig config = new producerconfig(props); producer<string, string> producer = new producer<string, string>(config); producer.send(new keyedmessage<string, string>("test",stream)); producer.close(); } }
problem in line kafka(stream.tostring());
java stream class doesn't override method tostring
. default returns getclass().getname() + '@' + integer.tohexstring(hashcode())
. that's recieve.
in order receive in kafka whole file, have manually convert 1 string (array of bytes).
please, note, kafka has limit message size.
Comments
Post a Comment