主機(jī)名映射出錯(cuò)
背景:
Yarn集群新加入了一批Spark機(jī)器后發(fā)現(xiàn)運(yùn)行Spark任務(wù)時(shí),一些task會(huì)無(wú)限卡住且driver端沒(méi)有任何提示资厉。
解決:
進(jìn)入task卡住的節(jié)點(diǎn)查看container stderr日志辅辩,發(fā)現(xiàn)在獲取其他節(jié)點(diǎn)block信息時(shí)离钝,連接不上其他的機(jī)器節(jié)點(diǎn)挣郭,不停重試浸船。
懷疑部分舊節(jié)點(diǎn)的/etc/hosts文件被運(yùn)維更新漏了妄迁,查看/etc/hosts寝蹈,發(fā)現(xiàn)沒(méi)有加入新節(jié)點(diǎn)的地址,加入后問(wèn)題解決登淘。
在集群節(jié)點(diǎn)不斷增多的情況下箫老,可以使用dns避免某些節(jié)點(diǎn)忘記修改/etc/hosts文件導(dǎo)致的錯(cuò)誤。
產(chǎn)生原因:
在讀取shuffle數(shù)據(jù)時(shí)黔州,本地的block會(huì)從本地的BlockManager讀取數(shù)據(jù)塊耍鬓,遠(yuǎn)程的block則通過(guò) BlockTransferService 讀取,其中包含了hostname作為地址信息流妻,如果沒(méi)有hostname和ip的映射信息牲蜀,則會(huì)獲取失敗,將調(diào)用
RetryingBlockFetcher進(jìn)行重試合冀,如果繼續(xù)失敗則會(huì)拋出異常: Exception while beginning fetch ...
NettyBlockTransferService
override def fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
try {
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
val client = clientFactory.createClient(host, port)
new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()
}
}
val maxRetries = transportConf.maxIORetries()
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
} else {
blockFetchStarter.createAndStart(blockIds, listener)
}
} catch {
case e: Exception =>
logError("Exception while beginning fetchBlocks", e)
blockIds.foreach(listener.onBlockFetchFailure(_, e))
}
}
RetryingBlockFetcher
private void fetchAllOutstanding() {
// Start by retrieving our shared state within a synchronized block.
String[] blockIdsToFetch;
int numRetries;
RetryingBlockFetchListener myListener;
synchronized (this) {
blockIdsToFetch = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]);
numRetries = retryCount;
myListener = currentListener;
}
// Now initiate the fetch on all outstanding blocks, possibly initiating a retry if that fails.
try {
fetchStarter.createAndStart(blockIdsToFetch, myListener);
} catch (Exception e) {
logger.error(String.format("Exception while beginning fetch of %s outstanding blocks %s",
blockIdsToFetch.length, numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
if (shouldRetry(e)) {
initiateRetry();
} else {
for (String bid : blockIdsToFetch) {
listener.onBlockFetchFailure(bid, e);
}
}
}
}
讀取kafka失敗各薇,不斷重試
原因:
防火墻原因連不上9092端口
總結(jié):
遇到類(lèi)似問(wèn)題一般在executor的日志下都能找到結(jié)果