Hadoop備忘:Reduce階段Iterablevalues中的每個值都共享一個對象
/**?
*?Iterate?through?the?values?for?the?current?key,?reusing?the?same?value??
*?object,?which?is?stored?in?the?context.?
*?@return?the?series?of?values?associated?with?the?current?key.?All?of?the??
*?objects?returned?directly?and?indirectly?from?this?method?are?reused.?
*/??
public???
Iterable?getValues()?throws?IOException,?InterruptedException?{??
return?iterable;??
} ?
在Reduce階段,具有相同key的的所有的value都會被組織到一起,形成一種key:values的形式。
一般情況下,我們會針對某個key下的所有的values進行處理,這里需要注意一個問題,當我們寫下如下代碼的時候:
protected?void?reduce(KEYIN?key,?Iterable?values,?Context?context??
)?throws?IOException,?InterruptedException?{??
for(VALUEIN?value:?values)?{??
context.write((KEYOUT)?key,?(VALUEOUT)?value);??
}??
} ?
我們在一個循環中,每次得到的value實際上都是指向的同一個對象,只是在每次迭代的時候,將新的值反序列化到這個對象中,以更新此對象的值:
/**?
*?Advance?to?the?next?key/value?pair.?
*/??
@Override??
public?boolean?nextKeyValue()?throws?IOException,?InterruptedException?{??
if?(!hasMore)?{??
key?=?null;??
value?=?null;??
return?false;??
}??
firstValue?=?!nextKeyIsSame;??
DataInputBuffer?nextKey?=?input.getKey();??
currentRawKey.set(nextKey.getData(),?nextKey.getPosition(),???
nextKey.getLength()?-?nextKey.getPosition());??
buffer.reset(currentRawKey.getBytes(),?0,?currentRawKey.getLength());??
key?=?keyDeserializer.deserialize(key);??
DataInputBuffer?nextVal?=?input.getValue();??
buffer.reset(nextVal.getData(),?nextVal.getPosition(),??
nextVal.getLength()?-?nextVal.getPosition());??
value?=?valueDeserializer.deserialize(value); ?
currentKeyLength?=?nextKey.getLength()?-?nextKey.getPosition();??
currentValueLength?=?nextVal.getLength()?-?nextVal.getPosition(); ?
hasMore?=?input.next();??
if?(hasMore)?{??
nextKey?=?input.getKey();??
nextKeyIsSame?=?comparator.compare(currentRawKey.getBytes(),?0,???
currentRawKey.getLength(),??
nextKey.getData(),??
nextKey.getPosition(),??
nextKey.getLength()?-?nextKey.getPosition()??
)?==?0;??
}?else?{??
nextKeyIsSame?=?false;??
}??
inputValueCounter.increment(1);??
return?true;??
} ?
為什么要注意這種情況呢?正如本文開始引用的那段Hadoop源代碼中的注釋所指明的,這里主要涉及到引用。如果值的類型是自己實現的某種Writable,比如說AType,并且在其中持有對其它對象的引用,同時,在Reduce階段還要對相鄰的兩個值(current_value,value)進行同時進行操作,這個時候,如果你僅僅是將value強制類型轉換成相應的AType,這個時候,current_value和value其實是指向的同一段內存空間,也就是說,當我們迭代完第一次的時候,current_value緩存了當前的值,但是當進行下一次迭代,取到的value,其實就是將它們共同指向的那段內存做了更新,換句話說,current_value所指向的內存也會被更新成value的值,如果不了解Reduce階段values的迭代實現,很可能就會造成莫名其妙的程序行為,而解決方案就是創建一個全新的對象來緩存“上一個”值,從而避免這種情況。
評論