從AbstractProcessor方法開始
public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor {
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessSession session = sessionFactory.createSession(); // 創(chuàng)建StandardProcessSession
try {
onTrigger(context, session);
session.commit(); // checkout + commit
} catch (final Throwable t) {
getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
session.rollback(true);
throw t;
}
}
// 具體到Processor的onTrigger方法
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
}
首先進(jìn)入每個(gè)的Processor的onTrigger()方法,該方法里先會(huì)調(diào)用session.write()方法,然后調(diào)用session.transfer方法
write方法里構(gòu)建流
StandardFlowFileQueue: 隊(duì)列的putAll方法
調(diào)度的循環(huán)開始: TimerDrivenSchedulingAgent->doSchedule()方法(只是一種策略)
FlowController里設(shè)置調(diào)度策略和對(duì)應(yīng)agent的關(guān)系的map(StandardProcessScheduler)