`

Hadoop-commons分析

 
阅读更多

 

hadoop的配置文件相关类 Configuration

所有大型的系统都有一套自己的配置系统或模块,用于方便系统扩展用,hadoop有自己独立的一套配置方式

采用XML文件,使用SAX解析

配置文件my-config.xml格式

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
	<property>
		<name>name</name>
		<value>Girls Generation</value>
		<final>true</final>
		<description>The boys~</description>
	</property>
</configuration>

 

可以加载多个配置文件如:

Configuration cfg = new Configuration();
cfg.addResource(new URL("http://mytest.com/hadoop.xml"));
cfg.addResource(new FileInputStream("/data0/test/hadoop.xml"));
cfg.addResource(new Path("hdfs://hadoop-test/data/test.xml"));
cfg.addResource("mytest.xml");

如果第一个配置文件f1.xml中的age字段是final的,则不会被第二个文件f2.xml中同名的元素覆盖;

反之则会覆盖

 

Configuration类的静态代码块中显示加载了hadoop相关的几个xml文件,都是通过类加载方式加载的

  static{
    //print deprecation warning if hadoop-site.xml is found in classpath
    ClassLoader cL = Thread.currentThread().getContextClassLoader();
    if (cL == null) {
      cL = Configuration.class.getClassLoader();
    }
    addDefaultResource("core-default.xml");
    addDefaultResource("core-site.xml");
  }

 

加载配置是延迟加载的,会优先加载hadoop相关的XML文件,然后才是自定义的XML文件

  private void loadResources(Properties properties,
                             ArrayList resources,
                             boolean quiet) {
    if(loadDefaults) {
      for (String resource : defaultResources) {
        loadResource(properties, resource, quiet);
      }
    
      //support the hadoop-site.xml as a deprecated case
      if(getResource("hadoop-site.xml")!=null) {
        loadResource(properties, "hadoop-site.xml", quiet);
      }
    }
    
    for (Object resource : resources) {
      loadResource(properties, resource, quiet);
    }
  }

 

配置文件支持表达式的方式

	<property>
		<name>hadoop.tmp.dir</name>
		<value>/data0/hadoop/tmp</value>
	</property>

	<property>
		<name>dir</name>
		<value>${hadoop.tmp.dir}/data</value>
	</property>

而表达式可以嵌套,${path1}又引用了${path2},path2又引用了${path3}

这个嵌套深度最多是20次

表达式可以写在配置文件中,也可以在启动时通过 -D 参数传入

 

此外还有一个接口Configurable,实现了这个接口的类都表示可以配置的

public interface Configurable {
  /** Set the configuration to be used by this object. */
  void setConf(Configuration conf);
  /** Return the configuration used by this object. */
  Configuration getConf();
}

 

 

 

 

hadoop的序列化

关于序列化有三种作用:

1.作为一种持久化格式,比如对象编码后存储到磁盘上

2.作为一种通信数据格式,将一个虚拟机上的对象通过网络传输到另一个虚拟机上

3.作为一种拷贝克隆机制,将对象序列化到内存中再反序列化读取

hadoop有自己的序列化机制,它主要用来解决1)和2)两种情况的,hadoop序列化使用方式

	public void run() throws IOException {
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		DataOutputStream dos = new DataOutputStream(baos);
		IntWritable iw = new IntWritable(9527);
		iw.write(dos);
		dos.close();
                System.out.println(new String(baos.toByteArray()));
      }

 

而IntWritable的write()函数很简单

  public void write(DataOutput out) throws IOException {
    out.writeInt(value);
  }

 

DataOutputStream的write实际上就是把int分别按位取然后跟0xFF做与运算,最后写入

    public final void writeInt(int v) throws IOException {
        out.write((v >>> 24) & 0xFF);
        out.write((v >>> 16) & 0xFF);
        out.write((v >>>  8) & 0xFF);
        out.write((v >>>  0) & 0xFF);
        incCount(4);
    }

  

 

 

Writable相关的类图


 

Writable的子类

 

序列化类中还有一个可变长度vint和vlong,vint具体实现是用vlong去做的,可变长度vlong可以有效节省空间 

可变长度vlong的写入源码

  public static void writeVLong(DataOutput stream, long i) throws IOException {
    if (i >= -112 && i <= 127) {
      stream.writeByte((byte)i);
      return;
    }
      
    int len = -112;
    if (i < 0) {
      i ^= -1L; // take one's complement'
      len = -120;
    }
      
    long tmp = i;
    while (tmp != 0) {
      tmp = tmp >> 8;
      len--;
    }
      
    stream.writeByte((byte)len);
      
    len = (len < -120) ? -(len + 120) : -(len + 112);
      
    for (int idx = len; idx != 0; idx--) {
      int shiftbits = (idx - 1) * 8;
      long mask = 0xFFL << shiftbits;
      stream.writeByte((byte)((i & mask) >> shiftbits));
    }
  }

  

可变长度vlong的读取源码

  public static long readVLong(DataInput stream) throws IOException {
    byte firstByte = stream.readByte();
    int len = decodeVIntSize(firstByte);
    if (len == 1) {
      return firstByte;
    }
    long i = 0;
    for (int idx = 0; idx < len-1; idx++) {
      byte b = stream.readByte();
      i = i << 8;
      i = i | (b & 0xFF);
    }
    return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
  }

  public static int decodeVIntSize(byte value) {
    if (value >= -112) {
      return 1;
    } else if (value < -120) {
      return -119 - value;
    }
    return -111 - value;
  }

  public static boolean isNegativeVInt(byte value) {
    return value < -120 || (value >= -112 && value < 0);
  }

 

hadoop针对java的基本类型,字符串,枚举,Writable,空值等提供了一个ObjectWritable类,可以写入多种类型,这个类也适用于远程过程调用(RPC) 

ObjectWritable#writObject源码,就是先写入这个类的名称,然后判断类中的变量是数组,枚举还是普通类型,然后再依次写入到流中

  public static void writeObject(DataOutput out, Object instance,
                                 Class declaredClass, 
                                 Configuration conf) throws IOException {

    if (instance == null) {                       // null
      instance = new NullInstance(declaredClass, conf);
      declaredClass = Writable.class;
    }

    UTF8.writeString(out, declaredClass.getName()); // always write declared

    if (declaredClass.isArray()) {                // array
      int length = Array.getLength(instance);
      out.writeInt(length);
      for (int i = 0; i < length; i++) {
        writeObject(out, Array.get(instance, i),
                    declaredClass.getComponentType(), conf);
      }
      
    } else if (declaredClass == String.class) {   // String
      UTF8.writeString(out, (String)instance);
      
    } else if (declaredClass.isPrimitive()) {     // primitive type

      if (declaredClass == Boolean.TYPE) {        // boolean
        out.writeBoolean(((Boolean)instance).booleanValue());
      } else if (declaredClass == Character.TYPE) { // char
        out.writeChar(((Character)instance).charValue());
      } else if (declaredClass == Byte.TYPE) {    // byte
        out.writeByte(((Byte)instance).byteValue());
      } else if (declaredClass == Short.TYPE) {   // short
        out.writeShort(((Short)instance).shortValue());
      } else if (declaredClass == Integer.TYPE) { // int
        out.writeInt(((Integer)instance).intValue());
      } else if (declaredClass == Long.TYPE) {    // long
        out.writeLong(((Long)instance).longValue());
      } else if (declaredClass == Float.TYPE) {   // float
        out.writeFloat(((Float)instance).floatValue());
      } else if (declaredClass == Double.TYPE) {  // double
        out.writeDouble(((Double)instance).doubleValue());
      } else if (declaredClass == Void.TYPE) {    // void
      } else {
        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
      }
    } else if (declaredClass.isEnum()) {         // enum
      UTF8.writeString(out, ((Enum)instance).name());
    } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
      UTF8.writeString(out, instance.getClass().getName());
      ((Writable)instance).write(out);

    } else {
      throw new IOException("Can't write: "+instance+" as "+declaredClass);
    }
  }

 

 

hadoop序列化框架

1.Avro

2.Thrift

3.Google protocol Buffer

hadoop自身的简单的序列化框架API(在org.apache.hadoop.io.serializer包中)的类图


 

 

 

 

 

参考

抽象工厂模式-与-工厂方法模式区别

《JAVA与模式》之抽象工厂模式

 

  • 大小: 11.5 KB
  • 大小: 35.8 KB
  • 大小: 45.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics