Cassandra Bulk-Write performance with Java Driver is atrocious compared to MongoDB -
i have built importer mongodb , cassandra. operations of importer same, except last part data gets formed match needed cassandra table schema , wanted mongodb document structure. write performance of cassandra bad compared mongodb , think i'm doing wrong.
basically, abstract importer class loads data, reads out data , passes extending mongodbimporter or cassandraimporter class send data databases. 1 database targeted @ time - no "dual" inserts both c* , mongodb @ same time. importer run on same machine against same number of nodes (6).
the problem:
mongodb import finished after 57 minutes. ingested 10.000.000 documents , expect same amount of rows cassandra. cassandra importer running since 2,5 hours , @ 5.000.000 inserted rows. wait importer finish , edit actual finish time in here.
how import cassandra:
i prepare 2 statements once before ingesting data. both statements update queries because have append data existing list. table cleared before starting import. prepared statements used on , on again.
preparedstatement statementa = session.prepare(querya); preparedstatement statementb = session.prepare(queryb);
for every row, create boundstatement , pass statement "custom" batching method:
boundstatement bs = new boundstatement(preparedstatement); //either statementa or b bs = bs.bind(); //add data... several bs.setxxx(..) calls cassandraconnection.executebatch(bs);
with mongodb, can insert 1000 documents (thats maximum) @ time without problems. cassandra, importer crashes com.datastax.driver.core.exceptions.invalidqueryexception: batch large
10 of statements @ point. i'm using code build batches. btw, began 1000, 500, 300, 200, 100, 50, 20 batch size before not work too. set down 10 , threw exception again. i'm out of ideas why it's breaking.
private static final int max_batch_size = 10; private session session; private batchstatement currentbatch; ... @override public resultset executebatch(statement statement) { if (session == null) { throw new illegalstateexception(connection_state_exception); } if (currentbatch == null) { currentbatch = new batchstatement(type.unlogged); } currentbatch.add(statement); if (currentbatch.size() == max_batch_size) { resultset result = session.execute(currentbatch); currentbatch = new batchstatement(type.unlogged); return result; } return null; }
my c* schema looks this
create type stream.event ( data_dbl frozen<map<text, double>>, data_str frozen<map<text, text>>, data_bool frozen<map<text, boolean>>, ); create table stream.data ( log_creator text, date text, //date of timestamp ts timestamp, log_id text, //some id hour int, //just hour of timestmap x double, y double, events list<frozen<event>>, primary key ((log_creator, date, hour), ts, log_id) ) clustering order (ts asc, log_id asc)
i need add further new events existing row. that's why need list of udts. udt contains 3 maps because event creators produce different data (key/value pairs of type string/double/boolean). aware of fact udts frozen , can not touch maps of ingested events. that's fine me, need add new events have same timestamp sometimes. partition on creator of logs (some sensor name) date of record (ie. "22-09-2016") , hour of timestamp (to distribute data more while keeping related data close in partition).
i'm using cassandra 3.0.8 datastax java driver, version 3.1.0 in pom. according what batch limit in cassandra?, should not increase batch size adjusting batch_size_fail_threshold_in_kb
in cassandra.yaml
. so... or what's wrong import?
update have adjusted code run async queries , store running inserts in list. whenever async insert finishes, removed list. when list size exceeds threshold , error occured in insert before, method wait 500ms until inserts below threshold. code automatically increasing threshold when no insert failed.
but after streaming 3.300.000 rows, there 280.000 inserts being processed no error happened. seems number of processed inserts looks high. 6 cassandra nodes running on commodity hardware, 2 years old.
is high number (280.000 6 nodes) of concurrent inserts problem? should add variable max_concurrent_insert_limit
?
private list<resultsetfuture> runninginsertlist; private static int concurrentinsertlimit = 1000; private static int concurrentinsertsleeptime = 500; ... @override public void executebatch(statement statement) throws interruptedexception { if (this.runninginsertlist == null) { this.runninginsertlist = new arraylist<>(); } //sleep while processing number of inserts high while (concurrentinserterroroccured && runninginsertlist.size() > concurrentinsertlimit) { thread.sleep(concurrentinsertsleeptime); } resultsetfuture future = this.executeasync(statement); this.runninginsertlist.add(future); futures.addcallback(future, new futurecallback<resultset>() { @override public void onsuccess(resultset result) { runninginsertlist.remove(future); } @override public void onfailure(throwable t) { concurrentinserterroroccured = true; } }, moreexecutors.samethreadexecutor()); if (!concurrentinserterroroccured && runninginsertlist.size() > concurrentinsertlimit) { concurrentinsertlimit += 2000; logger.info(string.format("new concurrent insert limit %d", concurrentinsertlimit)); } return; }
after using c* bit, i'm convinced should use batches keeping multiple tables in sync. if don't need feature, don't use batches @ because will incur in performance penalties.
the correct way load data c* async writes, optional backpressure if cluster can't keep ingestion rate. should replace "custom" batching method that:
- performs async writes
- keep under control how many inflight writes have
- perform retry when write timeouts.
to perform async writes, use .executeasync
method, return resultsetfuture
object.
to keep under control how many inflight queries collect resultsetfuture
object retrieved .executeasync
method in list, , if list gets (ballpark values here) 1k elements wait of them finish before issuing more writes. or can wait first finish before issuing 1 more write, keep list full.
and finally, can check write failures when you're waiting on operation complete. in case, could:
- write again same timeout value
- write again increased timeout value
- wait amount of time, , write again same timeout value
- wait amount of time, , write again increased timeout value
from 1 4 have increased backpressure strength. pick 1 best fit case.
edit after question update
your insert logic seems bit broken me:
- i don't see retry logic
- you don't remove item in list if fails
- your
while (concurrentinserterroroccured && runninginsertlist.size() > concurrentinsertlimit)
wrong, because sleep when number of issued queries >concurrentinsertlimit
, , because of 2. thread park there. - you never set false
concurrentinserterroroccured
i keep list of (failed) queries purpose of retrying them @ later time. gives me powerful control on queries, , when failed queries starts accumulate sleep few moments, , keep on retrying them (up x times, hard fail...).
this list should dynamic, eg add items there when queries fail, , remove items when perform retry. can understand limits of cluster, , tune concurrentinsertlimit
based on eg avg number of failed queries in last second, or stick simpler approach "pause if have item in retry list" etc...
edit 2 after comments
since don't want retry logic, change code way:
private list<resultsetfuture> runninginsertlist; private static int concurrentinsertlimit = 1000; private static int concurrentinsertsleeptime = 500; ... @override public void executebatch(statement statement) throws interruptedexception { if (this.runninginsertlist == null) { this.runninginsertlist = new arraylist<>(); } resultsetfuture future = this.executeasync(statement); this.runninginsertlist.add(future); futures.addcallback(future, new futurecallback<resultset>() { @override public void onsuccess(resultset result) { runninginsertlist.remove(future); } @override public void onfailure(throwable t) { runninginsertlist.remove(future); concurrentinserterroroccured = true; } }, moreexecutors.samethreadexecutor()); //sleep while processing number of inserts high while (runninginsertlist.size() >= concurrentinsertlimit) { thread.sleep(concurrentinsertsleeptime); } if (!concurrentinserterroroccured) { // increase ingestion rate if no query failed far concurrentinsertlimit += 10; } else { // decrease ingestion rate because @ least 1 query failed concurrentinserterroroccured = false; concurrentinsertlimit = max(1, concurrentinsertlimit - 50); while (runninginsertlist.size() >= concurrentinsertlimit) { thread.sleep(concurrentinsertsleeptime); } } return; }
you optimize bit procedure replacing list<resultsetfuture>
counter.
hope helps.
Comments
Post a Comment