淘先锋技术网

首页 1 2 3 4 5 6 7

map中的collect过程(序列化):

map过程中的collect主要由MapOutputBuffer来实现数据收集;

变量:BlockingBuffer bb

BlockingBuffer继承自DataOutputStream。其out指向Buffer

准备初始化:

      serializationFactory = newSerializationFactory(job);

      keySerializer = serializationFactory.getSerializer(keyClass);

      keySerializer.open(bb);

      valSerializer = serializationFactory.getSerializer(valClass);

  valSerializer.open(bb);

序列化:

         int keystart= bufindex;

        keySerializer.serialize(key);

        if (bufindex <keystart) {

          // wrapped the key; reset required

          bb.reset();

          keystart = 0;

        }

        // serialize valuebytes into buffer

        finalint valstart= bufindex;

        valSerializer.serialize(value);

    int valend = bb.markRecord();

内容经过BlockingBuffer转为byte写入Buffer中,具体内容存放位置为byte[] kvbuffer

     publicsynchronizedvoid write(byte b[], int off, int len)

          throws IOException {

        boolean buffull = false;

        boolean wrap = false;

        spillLock.lock();

        try {

          do {

            if (sortSpillException != null) {

              throw (IOException)new IOException("Spill failed"

                  ).initCause(sortSpillException);

            }

            // sufficient buffer space?

            if (bufstart <= bufend && bufend <= bufindex) {

              buffull = bufindex + len > bufvoid;

              wrap = (bufvoid - bufindex) + bufstart > len;

            } else {

              // bufindex <= bufstart <= bufend

              // bufend <= bufindex <= bufstart

              wrap = false;

              buffull = bufindex + len > bufstart;

            }

            if (kvstart == kvend) {

              // spill thread not running

              if (kvend != kvindex) {

                // we have records we can spill

                finalboolean bufsoftlimit = (bufindex > bufend)

                  ? bufindex - bufend > softBufferLimit

                  : bufend - bufindex < bufvoid -