HDFS write流程与代码分析(Hadoop 2.0)

Write操作是HDFS最基本的操作之一,一般是先create一个文件然后write内容。这篇文章主要讲用户调用FileSystem APT向HDFS写入数据时,HDFS是怎么工作的。
1,client发起create请求。
通过DistributedFileSystem.create()方法创建文件,其实就是通过DistributedFileSystem内部wrapper的DFSClient向NameNode发送RPC请求。因为所有的元数据相关的FileSystem API都是转嫁到DFSClient上然后通过与NameNode的RPC完成的。
create操作生成了一个DFSOutputStream对象,而构造这个对象时首先是向NameNode发送了create RPC,然后构造并启动了DataStreamer线程,最后开启Lease定时Renew机制LeaseRenewer。
2,NameNode处理create请求,然后给client发送response。
Create RPC到达NameNode之后依次走过NameNodeRpcServer.create()->FSNamesystem.startFile()->startFileInt()->startFileInternal()
3,client向NameNode请求分配Block,并向多个DataNode建立pipeline传输数据。

HDFS文件以一个一个Block的形式写入HDFS,Client首先向NameNode发送addBlock RPC请求,NameNode返回一个LocatedBlock对象包含该Block的元数据信息和对应的三副本所在的DataNode。然后client向pipelien中的第一个DataNode发起TCP连接。


NameNode接到client端发送的addBlock RPC之后主要做了一下几件事:
(1) 确认该client是否持有该文件的lease。
(2) Commit上一个Block,同时把这个Block的元数据持久化到EditLog,Renew lease。
(3) 通过BlockManage分配相应副本数目的DataNode用以存放该块Block。
(4) 给client端返回LocatedBlock。

HDFS的每个Block的construction是由write pipeline完成的。HDFS的pipeline有不同的阶段,通过BlockConstructionStage来表示。开始的时候是BlockConstructionStage.PIPELINE_SETUP_CREATE阶段。数据是以packet为单位被push到pipeline中。在没有出错的情况下Block construction经历了三个阶段(如下图所示):setup,streaming,close。注意到在streaming阶段(也就是t1-t2这段时间内)packet的发送并不是收到上一个packet的ack之后才发送下一个packet,而是以一种滑动窗口的形式发送的。在最后的close阶段就要等到这个Block内所有packet的ack全部收到才能close这个pipeline了。

Write主要是通过DFSOutputStream完成的。DFSOutputStream是write操作的核心类,该类里面有两个内部类Packet和DataStreamer,同时该类继承自FSOutputSummer。

FSOutputSummer这个抽象类包装了与checksum相关操作的方法,所以DFSOutputStream这个输出流在写数据的同时会写checksum。DFSOutputStream接收数据是通过FSOutputSummer的write方法。

数据传输的单位是由内部类Packet定义的,一个Packet大小是64K,是由512bytes的chunk组成的,一个chunk对应一个checksum。而另一个内部类DataStreamer是向外写数据的操作逻辑,是整个write操作的核心。


下面重点介绍DataStreamer类。DataStreamer.run()方法:
数据以默认64M的block为单位传输,也就是说 只支持client端串行写,这个和GFS不一样,GFS支持atomic record append,也就是多个client对同一个文件进行追加写。因为GFS是通过其中的一个ChunkServer(DataNode)来coordinate各个record有序的。HDFS是通过建立pipeline来完成数据传输的。

在client端有两个队列dataQueue和ackQueue。dataQueue是待发送的数据包packet临时存放的地方,ackQueue是已经发送还没有收到ack的数据包临时存放的地方。很多操作是需要对这两个队列上锁(synchronized)的。整个写数据的过程是以Block为单位的,整个过程随着BlockConstructionStage的变化而变化。一个Block写完,重新建立新的pipeline,写入新的Block。

public void run() {
      long lastPacket = Time.now();
      while (!streamerClosed && dfsClient.clientRunning) {

        // if the Responder encountered an error, shutdown Responder
        if (hasError && response != null) {
          try {
            response.close();
            response.join();
            response = null;
          } catch (InterruptedException  e) {
            DFSClient.LOG.warn("Caught exception ", e);
          }
        }

        Packet one = null;

        try {
          // process datanode IO errors if any
          boolean doSleep = false;
          if (hasError && errorIndex>=0) {
            doSleep = processDatanodeError();
          }

          synchronized (dataQueue) {
            // wait for a packet to be sent.
            long now = Time.now();
            while ((!streamerClosed && !hasError && dfsClient.clientRunning
                && dataQueue.size() == 0 &&
                (stage != BlockConstructionStage.DATA_STREAMING ||
                 stage == BlockConstructionStage.DATA_STREAMING &&
                 now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
              //dataQueue还没有数据,同时离pipeline中下游节点的读超时还远,那就先休息会吧。
              long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
              timeout = timeout <= 0 ? 1000 : timeout;
              timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                 timeout : 1000;
              try {
                dataQueue.wait(timeout);
              } catch (InterruptedException  e) {
                DFSClient.LOG.warn("Caught exception ", e);
              }
              doSleep = false;
              now = Time.now();
            }
            //任何时候发现DataStreamer挂了,或者pipeline中出错了,或者这个client被关闭了,那么后面直接跳过,回到循环开始的时候处理错误。
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }
            // get packet to be sent.
            if (dataQueue.isEmpty()) {
              one = new Packet(checksum.getChecksumSize());  // heartbeat packet
            } else {
              one = dataQueue.getFirst(); // regular data packet
            }
          }
          assert one != null;

          // get new block from namenode.
          if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
            if(DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("Allocating new block");
            }
            //如果是create文件,那么先向NameNode发送addBlock RPC,返回LocatedBlock中包含了三个DataNodeInfo;然后通过createBlockOutputStream()这个函数向第一个DN发送写命令,通过DN传导建立起pipeline。
            nodes = nextBlockOutputStream(src);
            //生成一个ResponseProcessor负责处理pipeline的response。一个64M的Block对应一个ResponseProcessor,而且所有Block也是串行传输的。然后pipeline进入DATA_STREAMING阶段。
            initDataStreaming();
          } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
            if(DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("Append to block " + block);
            }
            setupPipelineForAppendOrRecovery();
            initDataStreaming();
          }
          //获取current packet的最后一字节在整个Block中的offset
          long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
          if (lastByteOffsetInBlock > blockSize) {
            throw new IOException("BlockSize " + blockSize +
                " is smaller than data size. " +
                " Offset of packet in block " +
                lastByteOffsetInBlock +
                " Aborting file " + src);
          }

          if (one.lastPacketInBlock) {
            // wait for all data packets have been successfully acked
            synchronized (dataQueue) {
              while (!streamerClosed && !hasError &&
                  ackQueue.size() != 0 && dfsClient.clientRunning) {
                //如果ackQueue中还有待确认的packet,则循环等待直到这个Block中所有的packet都被确认了才退出循环。
                try {
                  // wait for acks to arrive from datanodes
                  dataQueue.wait(1000);
                } catch (InterruptedException  e) {
                  DFSClient.LOG.warn("Caught exception ", e);
                }
              }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }
            //这个Block中所有的packet都被ack了,那么这个BlockConstructionStage就发生变化了。一个Block对应一个pipeline,所以写Block的过程是从PIPELINE_CREATE到PIPELINE_CLOSE
            stage = BlockConstructionStage.PIPELINE_CLOSE;
          }

          // send the packet
          synchronized (dataQueue) {
            // move packet from dataQueue to ackQueue
            if (!one.isHeartbeatPacket()) {
              dataQueue.removeFirst();
              ackQueue.addLast(one);
              dataQueue.notifyAll();
            }
          }

          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("DataStreamer block " + block +
                " sending packet " + one);
          }

          //向刚才建立起来的pipeline中写packet
          try {
            one.writeTo(blockStream);
            blockStream.flush();
          } catch (IOException e) {
            errorIndex = 0;
            throw e;
          }
          lastPacket = Time.now();

          if (one.isHeartbeatPacket()) {  //heartbeat packet
          }

          // update bytesSent
          long tmpBytesSent = one.getLastByteOffsetBlock();
          if (bytesSent < tmpBytesSent) {
            bytesSent = tmpBytesSent;
          }

          if (streamerClosed || hasError || !dfsClient.clientRunning) {
            continue;
          }

          // Is this block full?
          if (one.lastPacketInBlock) {
            // wait for the close packet has been acked
            synchronized (dataQueue) {
              while (!streamerClosed && !hasError &&
                  ackQueue.size() != 0 && dfsClient.clientRunning) {
                dataQueue.wait(1000);// wait for acks to arrive from datanodes
              }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }
            //关闭ResponseProcessor,关闭pipeline中的输入输出流
            endBlock();
          }
          if (progress != null) { progress.progress(); }

          // This is used by unit test to trigger race conditions.
          if (artificialSlowdown != 0 && dfsClient.clientRunning) {
            Thread.sleep(artificialSlowdown);
          }
        } catch (Throwable e) {
          DFSClient.LOG.warn("DataStreamer Exception", e);
          if (e instanceof IOException) {
            setLastException((IOException)e);
          }
          hasError = true;
          if (errorIndex == -1) { // not a datanode error
            streamerClosed = true;
          }
        }
      }
      closeInternal();
    }

4,处在pipeline中的DataNode处理来自upstream的写操作。

在DataNode中,块数据的接收和发送主要是通过DataXceiver,这部分的网络传输没有采用RPC,而是传输的TCP连接。每个DataNode节点都会启动一个DataXceiverServer和多个DataXceiver,DataXceiverServer相当于TCP socket编程中的Listen线程,而DataXceiver相当于handle线程。

在DataXceiver的run()方法中,从pipeline的上游socket中读取操作内容(函数readOp()),然后处理这个操作(processOp())。对于WRITE_BLOCK这种操作,会调用DataXceiver.writeBlock()函数。这个函数做了如下几件大事:

(1)建立起与pipeline下游的TCP socket连接;

(2)给pipeline上游发送ack;

(3)生成BlockReceiver并通过BlockReceiver.receiveBlock()接收来自pipeline上游的数据,把数据发送给下游并存储在本地磁盘上。BlockReceiver内部包含有到downstream DN和本地磁盘的输出流。

这个函数首先生成PacketResponder接收并处理下游返回的response,同时将下游的ack转发给上游;然后通过循环调用receivePacket()这个函数接收数据包,发送到downstream DN并把packet持久化到本地磁盘(生成Block文件和meta文件)。

关于这三个步骤执行的并发性和先后顺序在HDFS-append之后变得异常复杂,以后有机会再单独阐述吧。

当没有数据传输时,pipeline中各datanode通过heartbeat对连接keepalive。在整个pipeline(包括client和多个DataNode)的各台机器中用于互相通信的socket都配置了两个timeout值,分别是confTime和socketTimeout。confTime的值是WRITE_TIMEOUT,而socketTimeout的值是READ_TIMEOUT,即同一个socket的读写timeout是不同的。这样在整个pipeline的timeout值都是一致的了。

int getDatanodeWriteTimeout(int numNodes) {

    return (dfsClientConf.confTime > 0) ?

      (dfsClientConf.confTime + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;

  }

  int getDatanodeReadTimeout(int numNodes) {

    return dfsClientConf.socketTimeout > 0 ?

        (HdfsServerConstants.READ_TIMEOUT_EXTENSION * numNodes +

            dfsClientConf.socketTimeout) : 0;

  }

与write相关还有个重要的问题就是flush/sync问题,而且这个问题随着HDFS对append的支持而越发复杂。DFSOutputStream继承自FSOutputSummer,DFSOutputStream接收数据是通过FSOutputSummer的write方法,这个方法是synchronized。而sync方法的flushBuffer()和enqueuePacket()也是synchronized。也就是说对一个DFSOutputStream线程,如果sync和write同时调用将发生同步等待。常见的情况就是在HBase中写HLog时,每次有数据写入就要sync。(HLog中有个logSyncer,默认配置是每秒钟调用一次sync,不管有没有数据写入)sync发生的频率非常高,sync抢到锁的可能性很大。这样在不断的sync,不断的flushBuffer,但write写数据却被blocked了。这时就需要发送heartbeat给downstream的DataNode也表示该pipeline还是健康的。

参考资料:

http://kazman.shidler.hawaii.edu/ArchDoc.html

hadoop jira





One thought on “HDFS write流程与代码分析(Hadoop 2.0)

  1. Pingback: DataScientist's Lab » HDFS append原理与代码分析

发表评论

电子邮件地址不会被公开。 必填项已用 * 标注

*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

使用新浪微博登陆