Hadoop and Cassandra - InvalidRequestException(why:Column timestamp required) -


i have simple mapred job running on cassandra cluster, when tries save output table invalidrequestexception(why:column timestamp required).

i've tried manually adding 'timestamp' column cf doesnt make difference.

here's description of cf (as interpreted cqlsh):

create table output_words (   key text primary key,   "count" int, ) compact storage ,   bloom_filter_fp_chance=0.010000 ,   caching='keys_only' ,   comment='' ,   dclocal_read_repair_chance=0.000000 ,   gc_grace_seconds=864000 ,   read_repair_chance=0.100000 ,   replicate_on_write='true' ,   populate_io_cache_on_flush='false' ,   compaction={'class': 'sizetieredcompactionstrategy'} ,   compression={'sstable_compression': 'snappycompressor'}; 

i'm using pom hadoop-core v1.1.2 , cassandra-thrift v1.2.4 on top of cassandra v1.2.4

can suggest how around this?

additional info

im configuring job follows (only showing config relevant output):

job job = new job(getconf(), "wordcount");  job.setjarbyclass(testjob.class); job.setmapperclass(tokenizermapper.class); job.setreducerclass(reducertocassandra.class);  job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(bytebuffer.class); job.setoutputvalueclass(list.class);  job.setoutputformatclass(columnfamilyoutputformat.class);  confighelper.setoutputcolumnfamily(job.getconfiguration(), _keyspace, output_column_family);  confighelper.setoutputrpcport(job.getconfiguration(), _port); confighelper.setoutputinitialaddress(job.getconfiguration(), _host); confighelper.setoutputpartitioner(job.getconfiguration(), "org.apache.cassandra.dht.murmur3partitioner"); 

and reducer class:

public static class reducertocassandra extends reducer<text, intwritable, bytebuffer, list<mutation>> {     public void reduce(text word, iterable<intwritable> values, context context) throws ioexception, interruptedexception {         int sum = 0;         (intwritable val : values) {             sum += val.get();         }         context.write(stringserializer.get().tobytebuffer(word.tostring()), collections.singletonlist(getmutation(word, sum)));     }      private static mutation getmutation(text word, int sum) {         column c = new column();         c.name = stringserializer.get().tobytebuffer("count");         c.value = integerserializer.get().tobytebuffer(sum);         c.timestamp = system.currenttimemillis() * 1000;          mutation m = new mutation();         m.column_or_supercolumn = new columnorsupercolumn();         m.column_or_supercolumn.column = c;         return m;     } } 

instead of

c.timestamp = system.currenttimemillis() * 1000;

can try this

c.settimestamp(system.currenttimemillis() * 1000) 

Comments

Popular posts from this blog

c# - Operator '==' incompatible with operand types 'Guid' and 'Guid' using DynamicExpression.ParseLambda<T, bool> -