一情臭、概述
原生的 Azkaban 支持的plugin類(lèi)型有以下這些:
- command:Linux shell命令行任務(wù)
- gobblin:通用數(shù)據(jù)采集工具
- hadoopJava:運(yùn)行hadoopMR任務(wù)
- java:原生java任務(wù)
- hive:支持執(zhí)行hiveSQL
- pig:pig腳本任務(wù)
- spark:spark任務(wù)
- hdfsToTeradata:把數(shù)據(jù)從hdfs導(dǎo)入Teradata
- teradataToHdfs:把數(shù)據(jù)從Teradata導(dǎo)入hdfs
其中最簡(jiǎn)單而且最常用的是command類(lèi)型,我們?cè)谏弦黄恼轮幸呀?jīng)描述了如何編寫(xiě)一個(gè)command的job任務(wù)叶组。所以我們把重點(diǎn)放到Azkaban支持的比較常用的四種類(lèi)型:java、hadoopJava朴则、hive人乓、spark
二、java類(lèi)型
1绿满、代碼編寫(xiě):MyJavaJob.java
package com.dataeye.java;
public class MyJavaJob {
public static void main(String[] args) {
System.out.println("#################################");
System.out.println("#### MyJavaJob class exec... ###");
System.out.println("#################################");
}
}
2、打包成jar文件:使用maven或者eclipse導(dǎo)出為jar文件
3窟扑、編寫(xiě)job文件:java.job
type=javaprocess
classpath=./lib/*,${azkaban.home}/lib/*
java.class=com.dataeye.java.MyJavaJob
4喇颁、組成一個(gè)完整的運(yùn)行包
新建一個(gè)目錄,在該目錄下創(chuàng)建一個(gè)lib文件夾嚎货,把第二步打包的jar文件放到這里橘霎,把job文件放到和lib文件夾同一級(jí)的目錄下,如下所示:
5殖属、打包成zip文件
把lib目錄和job文件打包成zip文件姐叁,如下的java.zip:
6、提交運(yùn)行洗显,過(guò)程跟之前文章介紹的步驟一樣外潜,不再詳述,執(zhí)行結(jié)果如下:
從輸出日志可以看出挠唆,代碼已經(jīng)正常執(zhí)行处窥。
以上是java類(lèi)型的任務(wù)編寫(xiě)和執(zhí)行的過(guò)程。接下來(lái)介紹其他任務(wù)編寫(xiě)的時(shí)候玄组,只會(huì)介紹代碼的編寫(xiě)和job的編寫(xiě)滔驾,其他過(guò)程與上面的一致。
三俄讹、hadoopJava類(lèi)型
1哆致、數(shù)據(jù)準(zhǔn)備
以下內(nèi)容是運(yùn)行wordcount任務(wù)時(shí)需要的輸入文件input.txt:
1 Ross male 33 3674
2 Julie male 42 2019
3 Gloria female 45 3567
4 Carol female 36 2813
5 Malcolm male 42 2856
6 Joan female 22 2235
7 Niki female 27 3682
8 Betty female 20 3001
9 Linda male 21 2511
10 Whitney male 35 3075
11 Lily male 27 3645
12 Fred female 39 2202
13 Gary male 28 3925
14 William female 38 2056
15 Charles male 48 2981
16 Michael male 25 2606
17 Karl female 32 2260
18 Barbara male 39 2743
19 Elizabeth female 26 2726
20 Helen female 47 2457
21 Katharine male 45 3638
22 Lee female 43 3050
23 Ann male 35 2874
24 Diana male 37 3929
25 Fiona female 45 2955
26 Bob female 21 3382
27 John male 48 3677
28 Thomas female 22 2784
29 Dean male 38 2266
30 Paul female 31 2679
把input.txt文件拷貝到hdfs的 /data/yann/input 目錄下
2、代碼準(zhǔn)備:
package azkaban.jobtype.examples.java;
import azkaban.jobtype.javautils.AbstractHadoopJob;
import azkaban.utils.Props;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.log4j.Logger;
public class WordCount extends AbstractHadoopJob
{
private static final Logger logger = Logger.getLogger(WordCount.class);
private final String inputPath;
private final String outputPath;
private boolean forceOutputOverrite;
public WordCount(String name, Props props)
{
super(name, props);
this.inputPath = props.getString("input.path");
this.outputPath = props.getString("output.path");
this.forceOutputOverrite = props.getBoolean("force.output.overwrite", false);
}
public void run()
throws Exception
{
logger.info(String.format("Starting %s", new Object[] { getClass().getSimpleName() }));
JobConf jobconf = getJobConf();
jobconf.setJarByClass(WordCount.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(IntWritable.class);
jobconf.setMapperClass(Map.class);
jobconf.setReducerClass(Reduce.class);
jobconf.setInputFormat(TextInputFormat.class);
jobconf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.addInputPath(jobconf, new Path(this.inputPath));
FileOutputFormat.setOutputPath(jobconf, new Path(this.outputPath));
if (this.forceOutputOverrite)
{
FileSystem fs = FileOutputFormat.getOutputPath(jobconf).getFileSystem(jobconf);
fs.delete(FileOutputFormat.getOutputPath(jobconf), true);
}
super.run();
}
public static class Map extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable>
{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
private long numRecords = 0L;
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
this.word.set(tokenizer.nextToken());
output.collect(this.word, one);
reporter.incrCounter(Counters.INPUT_WORDS, 1L);
}
if (++this.numRecords % 100L == 0L)
reporter.setStatus("Finished processing " + this.numRecords + " records " + "from the input file");
}
static enum Counters
{
INPUT_WORDS;
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
int sum = 0;
while (values.hasNext()) {
sum += ((IntWritable)values.next()).get();
}
output.collect(key, new IntWritable(sum));
}
}
}
3患膛、編寫(xiě)job文件
wordcount.job文件內(nèi)容如下:
type=hadoopJava
job.extend=false
job.class=azkaban.jobtype.examples.java.WordCount
classpath=./lib/*,${azkaban.home}/lib/*
force.output.overwrite=true
input.path=/data/yann/input
output.path=/data/yann/output
這樣hadoopJava類(lèi)型的任務(wù)已經(jīng)完成沽瞭,打包提交到Azkaban中執(zhí)行即可
四、hive類(lèi)型
1、編寫(xiě) hive.sql文件
use azkaban;
INSERT OVERWRITE TABLE
user_table1 PARTITION (day_p='2017-02-08')
SELECT appid,uid,country,province,city
FROM user_table0 where adType=1;
以上是標(biāo)準(zhǔn)的hive的sql腳本驹溃,首先切換到azkaban數(shù)據(jù)庫(kù),然后把user_table0 的數(shù)據(jù)插入到user_table1 表的指定day_p分區(qū)延曙。需要先準(zhǔn)備好 user_table0 和 user_table1 表結(jié)構(gòu)和數(shù)據(jù)豌鹤。
編寫(xiě)完成后,把文件放入 res 文件夾中枝缔。
2布疙、編寫(xiě)hive.job文件
type=hive
user.to.proxy=azkaban
classpath=./lib/*,${azkaban.home}/lib/*
azk.hive.action=execute.query
hive.script=res/hive.sql
關(guān)鍵的參數(shù)是 hive.script,該參數(shù)指定使用的sql腳本在 res目錄下的hive.sql文件
五愿卸、spark類(lèi)型
spark任務(wù)有兩種運(yùn)行方式灵临,一種是command類(lèi)型,另一種是spark類(lèi)型
首先準(zhǔn)備好spark任務(wù)的代碼
package com.dataeye.template.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext}
object WordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage:WordCount <hdfs_file>")
System.exit(1)
}
System.out.println("get first param ==> " + args(0))
System.out.println("get second param ==> " + args(1))
/** spark 2.0的方式
* val spark = SparkSession.builder().appName("WordCount").getOrCreate()
*/
val sc = new SparkContext(new SparkConf().setAppName("WordCount"))
val spark = new SQLContext(sc)
val file = spark.sparkContext.textFile(args(0))
val wordCounts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
// 數(shù)據(jù)collect 到driver端打印
wordCounts.collect().foreach(println _)
}
}
然后準(zhǔn)備數(shù)據(jù)趴荸,數(shù)據(jù)就使用前面hadoopJava中的數(shù)據(jù)即可儒溉。
最后打包成jar文件:spark-template-1.0-SNAPSHOT.jar
1、command類(lèi)型
command類(lèi)型的配置方式比較簡(jiǎn)單发钝,spark.job文件如下:
type=command
command=${spark.home}/bin/spark-submit --master yarn-cluster --class com.dataeye.template.spark.WordCount lib/spark-template-1.0-SNAPSHOT.jar hdfs://de-hdfs/data/yann/info.txt paramtest
2顿涣、spark類(lèi)型
type=spark
master=yarn-cluster
execution-jar=lib/spark-template-1.0-SNAPSHOT.jar
class=com.dataeye.template.spark.WordCount
params=hdfs://de-hdfs/data/yann/info.txt paramtest
以上就是Azkaban支持的幾種常用的任務(wù)類(lèi)型。