Hadoop and Cassandra - InvalidRequestException(why:Column timestamp required) -
i have simple mapred job running on cassandra cluster, when tries save output table invalidrequestexception(why:column timestamp required).
i've tried manually adding 'timestamp' column cf doesnt make difference.
here's description of cf (as interpreted cqlsh):
create table output_words ( key text primary key, "count" int, ) compact storage , bloom_filter_fp_chance=0.010000 , caching='keys_only' , comment='' , dclocal_read_repair_chance=0.000000 , gc_grace_seconds=864000 , read_repair_chance=0.100000 , replicate_on_write='true' , populate_io_cache_on_flush='false' , compaction={'class': 'sizetieredcompactionstrategy'} , compression={'sstable_compression': 'snappycompressor'}; i'm using pom hadoop-core v1.1.2 , cassandra-thrift v1.2.4 on top of cassandra v1.2.4
can suggest how around this?
additional info
im configuring job follows (only showing config relevant output):
job job = new job(getconf(), "wordcount"); job.setjarbyclass(testjob.class); job.setmapperclass(tokenizermapper.class); job.setreducerclass(reducertocassandra.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(bytebuffer.class); job.setoutputvalueclass(list.class); job.setoutputformatclass(columnfamilyoutputformat.class); confighelper.setoutputcolumnfamily(job.getconfiguration(), _keyspace, output_column_family); confighelper.setoutputrpcport(job.getconfiguration(), _port); confighelper.setoutputinitialaddress(job.getconfiguration(), _host); confighelper.setoutputpartitioner(job.getconfiguration(), "org.apache.cassandra.dht.murmur3partitioner"); and reducer class:
public static class reducertocassandra extends reducer<text, intwritable, bytebuffer, list<mutation>> { public void reduce(text word, iterable<intwritable> values, context context) throws ioexception, interruptedexception { int sum = 0; (intwritable val : values) { sum += val.get(); } context.write(stringserializer.get().tobytebuffer(word.tostring()), collections.singletonlist(getmutation(word, sum))); } private static mutation getmutation(text word, int sum) { column c = new column(); c.name = stringserializer.get().tobytebuffer("count"); c.value = integerserializer.get().tobytebuffer(sum); c.timestamp = system.currenttimemillis() * 1000; mutation m = new mutation(); m.column_or_supercolumn = new columnorsupercolumn(); m.column_or_supercolumn.column = c; return m; } }
instead of
c.timestamp = system.currenttimemillis() * 1000;
can try this
c.settimestamp(system.currenttimemillis() * 1000)
Comments
Post a Comment