1. 異常信息
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645)
at org.learn.StateWordCount$.main(StateWordCount.scala:50)
at org.learn.StateWordCount.main(StateWordCount.scala)
Caused by: TimerException{java.util.ConcurrentModificationException}
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
at java.util.HashMap$KeyIterator.next(HashMap.java:1466)
at org.learn.function.WordCountProcessFunction.onTimer(WordCountProcessFunction.scala:43)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onProcessingTime(KeyedProcessOperator.java:78)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:239)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
... 7 more
報(bào)錯(cuò)位置是 org.learn.function.WordCountProcessFunction.onTimer(WordCountProcessFunction.scala:43)
報(bào)錯(cuò)原因是java.util.ConcurrentModificationException
2. 代碼
package org.learn.function
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
class WordCountProcessFunction extends KeyedProcessFunction[String, (String, Int), (String, Int)] {
private var mapState: MapState[String, (String, Int)] = _
private var timerState: MapState[Long, Long] = _
override def open(parameters: Configuration): Unit = {
var mapStateDesc = new MapStateDescriptor[String, (String, Int)]("valueStateDesc", classOf[String], classOf[(String, Int)])
mapState = getRuntimeContext.getMapState(mapStateDesc)
val timerStateDesc = new MapStateDescriptor[Long, Long]("timerStateDesc", classOf[Long], classOf[Long])
timerState = getRuntimeContext.getMapState(timerStateDesc)
}
override def processElement(value: (String, Int), ctx: KeyedProcessFunction[String, (String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
var currentState: (String, Int) = mapState.get(value._1)
if (null == currentState) {
currentState = (value._1, 0)
// TTL時(shí)間
val ttlTime: Long = System.currentTimeMillis() - 30 * 1000 // 設(shè)置一個(gè)歷史時(shí)間
ctx.timerService().registerProcessingTimeTimer(ttlTime)
timerState.put(ttlTime, ttlTime)
timerState.put(ttlTime - 10, ttlTime - 10)
}
var newState: (String, Int) = (currentState._1, currentState._2 + value._2)
mapState.put(value._1, newState)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Int), (String, Int)]#OnTimerContext, out: Collector[(String, Int)]): Unit = {
System.out.println("clear..." + " timestamp: " + timestamp + " currentTime: " + System.currentTimeMillis() + " timerState: ")
val iter = timerState.keys().iterator()
while (iter.hasNext) {
val key = iter.next()
System.out.println("key: " + key + " value: " + timerState.get(key))
if (key < System.currentTimeMillis()) {
timerState.remove(key)
}
}
mapState.clear()
}
}
第 43 行:val key = iter.next()
錯(cuò)誤原因:利用迭代器遍歷 map 時(shí),如果同時(shí)調(diào)用 map.remove(Object key)
做移除操作楷怒,就會(huì)報(bào) java.util.ConcurrentModificationException
異常织堂。
改正方法:利用迭代器的 remove 方法 iter.remove()
做移除操作,則不會(huì)拋出該異常信息烁登。
3. 源碼
以 HashMap 為例怯屉,看看源碼。
-
進(jìn)入
java.util.HashMap.java
public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable { // HashMap的remove方法 public V remove(Object key) { Node<K,V> e; return (e = removeNode(hash(key), key, null, false, true)) == null ? null : e.value; } final Node<K,V> removeNode(int hash, Object key, Object value, boolean matchValue, boolean movable) { Node<K,V>[] tab; Node<K,V> p; int n, index; if ((tab = table) != null && (n = tab.length) > 0 && (p = tab[index = (n - 1) & hash]) != null) { Node<K,V> node = null, e; K k; V v; if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) node = p; else if ((e = p.next) != null) { if (p instanceof TreeNode) node = ((TreeNode<K,V>)p).getTreeNode(hash, key); else { do { if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) { node = e; break; } p = e; } while ((e = e.next) != null); } } if (node != null && (!matchValue || (v = node.value) == value || (value != null && value.equals(v)))) { if (node instanceof TreeNode) ((TreeNode<K,V>)node).removeTreeNode(this, tab, movable); else if (node == p) tab[index] = node.next; else p.next = node.next; ++modCount; --size; afterNodeRemoval(node); return node; } } return null; } final class KeyIterator extends HashIterator implements Iterator<K> { public final K next() { return nextNode().key; } } // 內(nèi)部類 abstract class HashIterator { Node<K,V> next; // next entry to return Node<K,V> current; // current entry int expectedModCount; // for fast-fail int index; // current slot HashIterator() { expectedModCount = modCount; Node<K,V>[] t = table; current = next = null; index = 0; if (t != null && size > 0) { // advance to first entry do {} while (index < t.length && (next = t[index++]) == null); } } public final boolean hasNext() { return next != null; } final Node<K,V> nextNode() { Node<K,V>[] t; Node<K,V> e = next; if (modCount != expectedModCount) throw new ConcurrentModificationException(); if (e == null) throw new NoSuchElementException(); if ((next = (current = e).next) == null && (t = table) != null) { do {} while (index < t.length && (next = t[index++]) == null); } return e; } // 迭代器的remove方法 public final void remove() { Node<K,V> p = current; if (p == null) throw new IllegalStateException(); if (modCount != expectedModCount) throw new ConcurrentModificationException(); current = null; K key = p.key; removeNode(hash(key), key, null, false, false); expectedModCount = modCount; } }
- 調(diào)用迭代器的
next()
方法饵沧,進(jìn)而調(diào)用nextNode()
方法 - 在
nextNode()
方法中會(huì)進(jìn)行判斷锨络,如果modCount != expectedModCount
,則拋出java.util.ConcurrentModificationException
異常 - 如果調(diào)用
HashMap.remove()
方法狼牺,則進(jìn)而會(huì)調(diào)用removeNode()
方法羡儿,在removeNode()
方法的最后,會(huì)對(duì) modCount+1是钥,此時(shí)后面再調(diào)用迭代器的next()
方法時(shí)掠归,就會(huì)拋出java.util.ConcurrentModificationException
異常 - 如果調(diào)用迭代器的 remove() 方法,該方法最后會(huì)
expectedModCount = modCount
悄泥,此時(shí)后面再調(diào)用迭代器的next()
方法時(shí)虏冻,不會(huì)拋出異常
- 調(diào)用迭代器的