非基本 MapReduce 模式
迭代消息傳遞 (圖處理)
問(wèn)題陳述:
假設(shè)一個(gè)實(shí)體網(wǎng)絡(luò),實(shí)體之間存在著關(guān)系。 需要按照與它比鄰的其他實(shí)體的屬性計(jì)算出一個(gè)狀態(tài)。這個(gè)狀態(tài)可以表現(xiàn)為它和其它節(jié)點(diǎn)之間的距離, 存在特定屬性的鄰接點(diǎn)的跡象, 鄰域密度特征等等。
解決方案:
網(wǎng)絡(luò)存儲(chǔ)為系列節(jié)點(diǎn)的結(jié)合,每個(gè)節(jié)點(diǎn)包含有其所有鄰接點(diǎn)ID的列表。按照這個(gè)概念,MapReduce 迭代進(jìn)行,每次迭代中每個(gè)節(jié)點(diǎn)都發(fā)消息給它的鄰接點(diǎn)。鄰接點(diǎn)根據(jù)接收到的信息更新自己的狀態(tài)。當(dāng)滿足了某些條件的時(shí)候迭代停止,如達(dá)到了最大迭代次數(shù)(網(wǎng)絡(luò)半徑)或兩次連續(xù)的迭代幾乎沒(méi)有狀態(tài)改變。從技術(shù)上來(lái)看,Mapper 以每個(gè)鄰接點(diǎn)的ID為鍵發(fā)出信息,所有的信息都會(huì)按照接受節(jié)點(diǎn)分組,reducer 就能夠重算各節(jié)點(diǎn)的狀態(tài)然后更新那些狀態(tài)改變了的節(jié)點(diǎn)。下面展示了這個(gè)算法:
class Mapper
method Map(id n, object N)
Emit(id n, object N)
for all id m in N.OutgoingRelations do
Emit(id m, message getMessage(N))
class Reducer
method Reduce(id m, [s1, s2,。..])
M = null
messages = []
for all s in [s1, s2,。..] do
if IsObject(s) then
M = s
else // s is a message
messages.add(s)
M.State = calculateState(messages)
Emit(id m, item M)
一個(gè)節(jié)點(diǎn)的狀態(tài)可以迅速的沿著網(wǎng)絡(luò)傳全網(wǎng),那些被感染了的節(jié)點(diǎn)又去感染它們的鄰居,整個(gè)過(guò)程就像下面的圖示一樣:
?
案例研究: 沿分類樹的有效性傳遞
問(wèn)題陳述:
這個(gè)問(wèn)題來(lái)自于真實(shí)的電子商務(wù)應(yīng)用。將各種貨物分類,這些類別可以組成一個(gè)樹形結(jié)構(gòu),比較大的分類(像男人、女人、兒童)可以再分出小分類(像男褲或女裝),直到不能再分為止(像男式藍(lán)色牛仔褲)。這些不能再分的基層類別可以是有效(這個(gè)類別包含有貨品)或者已無(wú)效的(沒(méi)有屬于這個(gè)分類的貨品)。如果一個(gè)分類至少含有一個(gè)有效的子分類那么認(rèn)為這個(gè)分類也是有效的。我們需要在已知一些基層分類有效的情況下找出分類樹上所有有效的分類。
解決方案:
這個(gè)問(wèn)題可以用上一節(jié)提到的框架來(lái)解決。我們咋下面定義了名為 getMessage和 calculateState 的方法:
class N
State in {True = 2, False = 1, null = 0},
initialized 1 or 2 for end-of-line categories, 0 otherwise
method getMessage(object N)
return N.State
method calculateState(state s, data [d1, d2,。..])
return max( [d1, d2,。..] )
案例研究:廣度優(yōu)先搜索
問(wèn)題陳述:需要計(jì)算出一個(gè)圖結(jié)構(gòu)中某一個(gè)節(jié)點(diǎn)到其它所有節(jié)點(diǎn)的距離。
解決方案: Source源節(jié)點(diǎn)給所有鄰接點(diǎn)發(fā)出值為0的信號(hào),鄰接點(diǎn)把收到的信號(hào)再轉(zhuǎn)發(fā)給自己的鄰接點(diǎn),每轉(zhuǎn)發(fā)一次就對(duì)信號(hào)值加1:
class N
State is distance,
initialized 0 for source node, INFINITY for all other nodes
method getMessage(N)
return N.State + 1
method calculateState(state s, data [d1, d2,。..])
min( [d1, d2,。..] )
案例研究:網(wǎng)頁(yè)排名和 Mapper 端數(shù)據(jù)聚合
這個(gè)算法由Google提出,使用權(quán)威的PageRank算法,通過(guò)連接到一個(gè)網(wǎng)頁(yè)的其他網(wǎng)頁(yè)來(lái)計(jì)算網(wǎng)頁(yè)的相關(guān)性。真實(shí)算法是相當(dāng)復(fù)雜的,但是核心思想是權(quán)重可以傳播,也即通過(guò)一個(gè)節(jié)點(diǎn)的各聯(lián)接節(jié)點(diǎn)的權(quán)重的均值來(lái)計(jì)算節(jié)點(diǎn)自身的權(quán)重。
class N
State is PageRank
method getMessage(object N)
return N.State / N.OutgoingRelations.size()
method calculateState(state s, data [d1, d2,。..])
return ( sum([d1, d2,。..]) )
要指出的是上面用一個(gè)數(shù)值來(lái)作為評(píng)分實(shí)際上是一種簡(jiǎn)化,在實(shí)際情況下,我們需要在Mapper端來(lái)進(jìn)行聚合計(jì)算得出這個(gè)值。下面的代碼片段展示了這個(gè)改變后的邏輯 (針對(duì)于 PageRank 算法):
class Mapper
method Initialize
H = new AssociativeArray
method Map(id n, object N)
p = N.PageRank / N.OutgoingRelations.size()
Emit(id n, object N)
for all id m in N.OutgoingRelations do
H{m} = H{m} + p
method Close
for all id n in H do
Emit(id n, value H{n})
class Reducer
method Reduce(id m, [s1, s2,。..])
M = null
p = 0
for all s in [s1, s2,。..] do
if IsObject(s) then
M = s
else
p = p + s
M.PageRank = p
Emit(id m, item M)
應(yīng)用:
圖分析,網(wǎng)頁(yè)索引
評(píng)論