map中的collect过程(序列化):
map过程中的collect主要由MapOutputBuffer来实现数据收集;
变量:BlockingBuffer bb;
BlockingBuffer继承自DataOutputStream。其out指向Buffer。
准备初始化:
serializationFactory = newSerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
序列化:
int keystart= bufindex;
keySerializer.serialize(key);
if (bufindex <keystart) {
// wrapped the key; reset required
bb.reset();
keystart = 0;
}
// serialize valuebytes into buffer
finalint valstart= bufindex;
valSerializer.serialize(value);
int valend = bb.markRecord();
内容经过BlockingBuffer转为byte后写入Buffer中,具体内容存放位置为byte[] kvbuffer:
publicsynchronizedvoid write(byte b[], int off, int len) throws IOException { boolean buffull = false; boolean wrap = false; spillLock.lock(); try { do { if (sortSpillException != null) { throw (IOException)new IOException("Spill failed" ).initCause(sortSpillException); } // sufficient buffer space? if (bufstart <= bufend && bufend <= bufindex) { buffull = bufindex + len > bufvoid; wrap = (bufvoid - bufindex) + bufstart > len; } else { // bufindex <= bufstart <= bufend // bufend <= bufindex <= bufstart wrap = false; buffull = bufindex + len > bufstart; } if (kvstart == kvend) { // spill thread not running if (kvend != kvindex) { // we have records we can spill finalboolean bufsoftlimit = (bufindex > bufend) ? bufindex - bufend > softBufferLimit : bufend - bufindex < bufvoid - |