java - Sending Data to Kafka Producer -


i trying read 100k file , send kafka topic. here kafka code sends data kafka-console-consumer. when sending data receiving data

java.util.stream.referencepipeline$head@e9e54c2 

here sample single record data sending:

173|172686|548247079|837113012|0x548247079f|7|173|172686a|0|173|2059 22143|0|173|1|173|172686|||0|||7|0||7|||7|172686|allowallservices|?20161231:22143|548247079||0|173||172686|5:2266490827:dccinter;20160905152146;2784 

any suggestion data had showned in above...thanks

code:

import java.io.bufferedreader; import java.io.filenotfoundexception; import java.io.filereader; import java.io.ioexception; import java.nio.file.files; import java.nio.file.paths; import java.util.properties; import java.util.properties; import java.util.concurrent.executionexception; import java.util.stream.stream;  import kafka.javaapi.producer.producer; import kafka.producer.keyedmessage; import kafka.producer.producerconfig;  @suppresswarnings("unused") public class hundredkrecords {     private static string scurrentline;    public static void main(string args[]) throws interruptedexception, executionexception{         string filename = "/users/sreeeedupuganti/downloads/octfwriter.txt";         //read file stream, try-with-resources        try (stream<string> stream = files.lines(paths.get(filename))) {            stream.foreach(system.out::println);            kafka(stream.tostring());        } catch (ioexception e) {            e.printstacktrace();        }        }     public static void kafka(string stream)  {        properties props = new properties();        props.put("metadata.broker.list", "localhost:9092");        props.put("serializer.class", "kafka.serializer.stringencoder");        props.put("partitioner.class","kafka.producer.defaultpartitioner");        props.put("request.required.acks", "1");        producerconfig config = new producerconfig(props);        producer<string, string> producer = new producer<string, string>(config);        producer.send(new keyedmessage<string, string>("test",stream));        producer.close();    } } 

problem in line kafka(stream.tostring());

java stream class doesn't override method tostring. default returns getclass().getname() + '@' + integer.tohexstring(hashcode()). that's recieve.

in order receive in kafka whole file, have manually convert 1 string (array of bytes).

please, note, kafka has limit message size.


Comments

Popular posts from this blog

unity3d - Rotate an object to face an opposite direction -

angular - Is it possible to get native element for formControl? -

javascript - Why jQuery Select box change event is now working? -