Announcement Announcement Module
Collapse
No announcement yet.
Avro job Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Avro job

    Hi all,

    I am trying to setup a job taking avro files as input and producing an avro file.
    I managed to read avro records in my Mapper (extending org.apache.hadoop.mapreduce.Mapper, so new API).

    My problem is I have no clue of how to write to the context and then read in org.apache.hadoop.mapreduce.Reducer (and even write again).
    I have no clue neither of how to bootstrap <hadoo:job/> configuration for avro.

    Is it possible to have a small sample of how to make a simple avro job in spring-hadoop ?
    This would be the first sample of an avro job using new mapreduce (not mapred) API I find, with or without Spring...

    EDIT: I should have provided Spring config (at least, I would'nt have lost what was making the Mapper read from avro file) :
    Code:
    	<!-- com.c4_soft.hadoop.CustomerIdAvroMapper extends Mapper<AvroKey<SerializableCustomer>, NullWritable, LongWritable, AvroValue<SerializableCustomer>> -->
    	<!-- com.c4_soft.hadoop.CustomerIdAvroReducer extends Reducer<LongWritable, AvroValue<SerializableCustomer>, AvroKey<SerializableFullBill>, NullWritable> -->
    	<hadoop:job
    		id="lab5"
    		jar-by-class="com.c4_soft.hadoop.CustomerIdAvroMapper"
    		mapper="com.c4_soft.hadoop.CustomerIdAvroMapper" 
    		map-key="org.apache.hadoop.io.LongWritable"
    		map-value="org.apache.avro.mapred.AvroValue"
    		reducer="com.c4_soft.hadoop.CustomerIdAvroReducer"
    		input-path="${path.customers.input}"
    		input-format="org.apache.avro.mapreduce.AvroKeyInputFormat"
    		output-path="${path.customers.output}"
    		output-format="org.apache.avro.mapreduce.AvroKeyOutputFormat">
    		avro.schema.input.key=com.c4_soft.hadoop.avro.SerializableCustomer.SCHEMA$
    		avro.schema.output.key=com.c4_soft.hadoop.avro.SerializableFullBill.SCHEMA$
    	</hadoop:job>
    When running that, I get (avro.schema.input.key & avro.schema.output.key are not interpreted, 'c' is first letter of package name):
    Code:
    org.apache.avro.SchemaParseException: org.codehaus.jackson.JsonParseException: Unexpected character ('c' (code 99)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
     at [Source: java.io.StringReader@1e47c8ac; line: 1, column: 2]
    	at org.apache.avro.Schema$Parser.parse(Schema.java:929)
    	at org.apache.avro.Schema$Parser.parse(Schema.java:917)
    	at org.apache.avro.Schema.parse(Schema.java:966)
    	at org.apache.avro.mapreduce.AvroJob.getInputKeySchema(AvroJob.java:142)
    	at org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
    	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.<init>(MapTask.java:489)
    	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:731)
    	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
    	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
    Caused by: org.codehaus.jackson.JsonParseException: Unexpected character ('c' (code 99)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
     at [Source: java.io.StringReader@1e47c8ac; line: 1, column: 2]
    	at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
    	at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385)
    	at org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:306)
    	at org.codehaus.jackson.impl.ReaderBasedParser._handleUnexpectedValue(ReaderBasedParser.java:630)
    	at org.codehaus.jackson.impl.ReaderBasedParser.nextToken(ReaderBasedParser.java:364)
    	at org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java:2439)
    	at org.codehaus.jackson.map.ObjectMapper._readValue(ObjectMapper.java:2377)
    	at org.codehaus.jackson.map.ObjectMapper.readTree(ObjectMapper.java:1234)
    	at org.codehaus.jackson.map.ObjectMapper.readTree(ObjectMapper.java:1209)
    	at org.apache.avro.Schema$Parser.parse(Schema.java:927)
    	... 8 more
    Last edited by ch4mp; Mar 26th, 2013, 05:40 PM.

  • #2
    Little progress: using SpEL, I managed to set avro.schema.input.key and avro.schema.output.key, but I don't belive I'm on the right path.
    Those String are AvroJob private members and there is a good reason for that : AvroJob.setXxx(job, schema) set several parameters.
    For instance, I need to call
    Code:
    public static void AvroJob.setMapOutputValueSchema(Job job, Schema schema) {
        job.setMapOutputValueClass(AvroValue.class);
        AvroSerialization.setValueWriterSchema(job.getConfiguration(), schema);
        AvroSerialization.setValueReaderSchema(job.getConfiguration(), schema);
        AvroSerialization.addToConfiguration(job.getConfiguration());
      }
    this does both set map output (so map-value="org.apache.avro.mapred.AvroValue" in job definition might not be necessary) and register the right serializer (after having configured it)

    So, what should I put in Spring context file to have Avro configuration public API called (org.apache.avro.mapreduce.AvroJob static methods) ?
    Last edited by ch4mp; Mar 27th, 2013, 01:32 AM.

    Comment


    • #3
      I finally found a solution. It would have been easier if I (and 90 readers up to now ) had a better memory about Spring dependency injection courses.
      What I was asking for lately (what should I put in Spring context file to have Avro configuration public API called ?) seems a good case for InitializingBean. Here is the one I wrote:
      Code:
      package com.c4_soft.util.hadoop.avro;
      
      import org.apache.avro.generic.IndexedRecord;
      import org.apache.avro.mapreduce.AvroJob;
      import org.apache.hadoop.mapreduce.Job;
      import org.springframework.beans.factory.InitializingBean;
      import org.springframework.util.ClassUtils;
      
      /**
       * An InitializingBean that does AvroJob (mapreduce, not mapred API) initialization steps<br>
       * Requires a hadoop job reference as constructor arg<br>
       * @author ch4mp
       *
       */
      public class AvroJobInitializingBean implements InitializingBean {
      	private Job job;
      	private String avroInputKey = null;
      	private String avroInputValue = null;
      	private String avroMapOutputKey = null;
      	private String avroMapOutputValue = null;
      	private String avroOutputKey = null;
      	private String avroOutputValue = null;
      	private ClassLoader cl;
      
      	
      	/**
      	 * @param aJob a reference to the (avro) job to configure
      	 */
      	public AvroJobInitializingBean(Job aJob) {
      		this.job = aJob;
      	}
      	
      	@Override
      	public void afterPropertiesSet() throws Exception {
      		
      		if(avroInputKey != null) {
      			AvroJob.setInputKeySchema(job, resolveClass(avroInputKey).newInstance().getSchema());
      		}
      
      		if(avroInputValue!= null) {
      			AvroJob.setInputValueSchema(job, resolveClass(avroInputValue).newInstance().getSchema());
      		}
      
      		if(avroMapOutputKey != null) {
      			AvroJob.setMapOutputKeySchema(job, resolveClass(avroMapOutputKey).newInstance().getSchema());
      		}
      
      		if(avroMapOutputValue != null) {
      			Class<? extends IndexedRecord> c = resolveClass(avroMapOutputValue);
      			IndexedRecord o = c.newInstance();
      			AvroJob.setMapOutputValueSchema(job, o.getSchema());
      		}
      
      		if(avroOutputKey != null) {
      			AvroJob.setOutputKeySchema(job, resolveClass(avroOutputKey).newInstance().getSchema());
      		}
      
      		if(avroOutputValue != null) {
      			AvroJob.setOutputValueSchema(job, resolveClass(avroOutputValue).newInstance().getSchema());
      		}
      	}
      	
      	public void setAvroInputKey(String avroInputKey) {
      		this.avroInputKey = avroInputKey;
      	}
      
      	public void setAvroInputValue(String avroInputValue) {
      		this.avroInputValue = avroInputValue;
      	}
      
      	public void setAvroMapOutputKey(String avroMapOutputKey) {
      		this.avroMapOutputKey = avroMapOutputKey;
      	}
      
      	public void setAvroMapOutputValue(String avroMapOutputValue) {
      		this.avroMapOutputValue = avroMapOutputValue;
      	}
      
      	public void setAvroOutputKey(String avroOutputKey) {
      		this.avroOutputKey = avroOutputKey;
      	}
      
      	public void setAvroOutputValue(String avroOutputValue) {
      		this.avroOutputValue = avroOutputValue;
      	}
      
      	@SuppressWarnings("unchecked")
      	protected Class<? extends IndexedRecord> resolveClass(String className) {
      		return (Class<? extends IndexedRecord>) ClassUtils.resolveClassName(className, getBeanClassLoader());
      	}
      
      	protected ClassLoader getBeanClassLoader() {
      		if(cl == null) {
      			cl = org.springframework.util.ClassUtils.getDefaultClassLoader();
      		}
      		return cl;
      	}
      
      	protected void setBeanClassLoader(ClassLoader beanClassLoader) {
      		this.cl = beanClassLoader;
      	}
      }
      And here is what a configuration using it looks like (classes in com.c4_soft.hadoop.avro being avro SpecificRecord subclasses):
      Code:
      	<hadoop:job
      		id="billsByProductIdIndexer"
      		jar-by-class="com.c4_soft.hadoop.BillByProductIdAvroMapper"
      		mapper="com.c4_soft.hadoop.BillByProductIdAvroMapper"
      		reducer="com.c4_soft.hadoop.BillByProductIdAvroReducer"
      		input-path="${path.bills}"
      		input-format="org.apache.avro.mapreduce.AvroKeyInputFormat"
      		output-path="${path.bills.byproductid}"
      		output-format="org.apache.avro.mapreduce.AvroKeyOutputFormat">
      	</hadoop:job>
      	
      	<bean class="com.c4_soft.util.hadoop.avro.AvroJobInitializingBean">
      		<constructor-arg ref="billsByProductIdIndexer"/>
      		<property name="avroInputKey" value="com.c4_soft.hadoop.avro.SerializableBill" />
      		<property name="avroMapOutputValue" value="com.c4_soft.hadoop.avro.SerializableBill" />
      		<property name="avroOutputKey" value="com.c4_soft.hadoop.avro.LongIndexedSerializableBill" />
      	</bean>
      	
      	<hadoop:job-runner id="billsByProductIdIndexer-runner" pre-action="cleanup-script" job-ref="billsByProductIdIndexer" run-at-startup="true"/>
      Last edited by ch4mp; Mar 30th, 2013, 01:08 PM.

      Comment

      Working...
      X