一丐重、軟件環(huán)境
我使用的軟件版本如下:
- Intellij Idea 2017.1
- Maven 3.3.9
- Hadoop分布式環(huán)境
二腔召、創(chuàng)建maven工程
打開Idea,file->new->Project,左側(cè)面板選擇maven工程。(如果只跑MapReduce創(chuàng)建java工程即可扮惦,不用勾選Creat from archetype臀蛛,如果想創(chuàng)建web工程或者使用骨架可以勾選)
設(shè)置GroupId和ArtifactId,下一步崖蜜。
設(shè)置工程存儲(chǔ)路徑浊仆,下一步烙肺。
Finish之后,空白工程的路徑如下圖所示氧卧。
完整的工程路徑如下圖所示:
三桃笙、添加maven依賴
在pom.xml添加依賴,對于hadoop 2.7.3版本的hadoop沙绝,需要的jar包有以下幾個(gè):
hadoop-common
hadoop-hdfs
hadoop-mapreduce-client-core
hadoop-mapreduce-client-jobclient
-
log4j( 打印日志)
pom.xml中的依賴如下:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
四搏明、配置log4j
在src/main/resources
目錄下新增log4j的配置文件log4j.properties
,內(nèi)容如下:
log4j.rootLogger = debug,stdout
### 輸出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
五闪檬、啟動(dòng)Hadoop
啟動(dòng)Hadoop星著,運(yùn)行命令:
cd hadoop-2.7.3/
./sbin/start-all.sh
訪問http://localhost:50070/查看hadoop是否正常啟動(dòng)。
六粗悯、運(yùn)行WordCount(從本地讀取文件)
在工程根目錄下新建input文件夾虚循,input文件夾下新增dream.txt,隨便寫入一些單詞:
I have a dream
a dream
在src/main/java目錄下新建包样傍,新增FileUtil.java横缔,創(chuàng)建一個(gè)刪除output文件的函數(shù),以后就不用手動(dòng)刪除了衫哥。內(nèi)容如下:
package com.mrtest.hadoop;
import java.io.File;
/**
* Created by bee on 3/25/17.
*/
public class FileUtil {
public static boolean deleteDir(String path) {
File dir = new File(path);
if (dir.exists()) {
for (File f : dir.listFiles()) {
if (f.isDirectory()) {
deleteDir(f.getName());
} else {
f.delete();
}
}
dir.delete();
return true;
} else {
System.out.println("文件(夾)不存在!");
return false;
}
}
}
2茎刚、wordcount程序?qū)崿F(xiàn)
1、編寫map方法
package com.neusoft;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 該類做為一個(gè) mapTask 使用撤逢。類聲名中所使用的四個(gè)泛型意義為別為:
*
* KEYIN: 默認(rèn)情況下膛锭,是mr框架所讀到的一行文本的起始偏移量,Long,
* 但是在hadoop中有自己的更精簡的序列化接口蚊荣,所以不直接用Long初狰,而用LongWritable
* VALUEIN: 默認(rèn)情況下,是mr框架所讀到的一行文本的內(nèi)容互例,String奢入,同上,用Text
* KEYOUT: 是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的key敲霍,在此處是單詞俊马,String丁存,同上肩杈,用Text
* VALUEOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的value,在此處是單詞次數(shù)解寝,Integer扩然,同上,用IntWritable
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map階段的業(yè)務(wù)邏輯就寫在自定義的map()方法中 maptask會(huì)對每一行輸入數(shù)據(jù)調(diào)用一次我們自定義的map()方法
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 將maptask傳給我們的文本內(nèi)容先轉(zhuǎn)換成String
String line = value.toString();
// 根據(jù)空格將這一行切分成單詞
String[] words = line.split(" ");
// 將單詞輸出為<單詞聋伦,1>
for (String word : words) {
// 將單詞作為key夫偶,將次數(shù)1作為value界睁,以便于后續(xù)的數(shù)據(jù)分發(fā),可以根據(jù)單詞分發(fā)兵拢,以便于相同單詞會(huì)到相同的reduce task
context.write(new Text(word), new IntWritable(1));
}
}
}
2翻斟、編寫reduce方法
package com.neusoft;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 與 Mapper 類似,繼承的同事聲名四個(gè)泛型说铃。
* KEYIN, VALUEIN 對應(yīng) mapper輸出的KEYOUT,VALUEOUT類型對應(yīng)
* KEYOUT, VALUEOUT 是自定義reduce邏輯處理結(jié)果的輸出數(shù)據(jù)類型访惜。此處 keyOut 表示單個(gè)單詞,valueOut 對應(yīng)的是總次數(shù)
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>
* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
* <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>
* 入?yún)ey腻扇,是一組相同單詞kv對的key
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
for(IntWritable value : values){
count += value.get();
}
context.write(key, new IntWritable(count)); //輸出每一個(gè)單詞出現(xiàn)的次數(shù)
}
}
解釋:
map()方法中我們設(shè)置的時(shí)候單詞做為key,1作為value寫出债热,比如
package com.neusoft;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 相當(dāng)于一個(gè)yarn集群的客戶端
* 需要在此封裝我們的mr程序的相關(guān)運(yùn)行參數(shù),指定jar包
* 最后提交給yarn
*/
public class WordcountDriver {
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
if (args == null || args.length == 0) {
return;
}
//該對象會(huì)默認(rèn)讀取環(huán)境中的 hadoop 配置幼苛。當(dāng)然窒篱,也可以通過 set 重新進(jìn)行配置
Configuration conf = new Configuration();
//job 是 yarn 中任務(wù)的抽象。
Job job = Job.getInstance(conf);
/*job.setJar("/home/hadoop/wc.jar");*/
//指定本程序的jar包所在的本地路徑
job.setJarByClass(WordcountDriver.class);
//指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//指定mapper輸出數(shù)據(jù)的kv類型舶沿。需要和 Mapper 中泛型的類型保持一致
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最終輸出的數(shù)據(jù)的kv類型墙杯。這里也是 Reduce 的 key,value類型括荡。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入原始文件所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的輸出結(jié)果所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//將job中配置的相關(guān)參數(shù)霍转,以及job所用的java類所在的jar包瑟慈,提交給yarn去運(yùn)行
/*job.submit();*/
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
這里在main函數(shù)中新增了一個(gè)String類型的數(shù)組朋凉,如果想用main函數(shù)的args數(shù)組接受參數(shù),在運(yùn)行時(shí)指定輸入和輸出路徑也是可以的顽冶。運(yùn)行WordCount之前召夹,配置Configuration并指定Program arguments即可岩喷。
七、運(yùn)行WordCount(從HDFS讀取文件)
在HDFS上新建文件夾:
hadoop fs -mkdir /worddir
如果出現(xiàn)Namenode安全模式導(dǎo)致的不能創(chuàng)建文件夾提示:
mkdir: Cannot create directory /worddir. Name node is in safe mode.
運(yùn)行以下命令關(guān)閉safe mode:
hadoop dfsadmin -safemode leave
上傳本地文件:
hadoop fs -put dream.txt /worddir
修改otherArgs參數(shù)监憎,指定輸入為文件在HDFS上的路徑:
String[] otherArgs = new String[]{"hdfs://localhost:9000/worddir/dream.txt","output"};
解決org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
這個(gè)問題來的有點(diǎn)莫名奇妙纱意,之前我的hadoop運(yùn)行一直是正常的,某一天開始運(yùn)行Mapreduce就報(bào)這個(gè)錯(cuò)鲸阔。
試過很多種方法都沒有用偷霉,比如
1.path環(huán)境變量
2.Hadoop bin目錄下hadoop.dll和winutils.exe
3.c:\windows\system32 下的hadoop.dll
4.64為jdk
條件都滿足了還是報(bào)錯(cuò)
試了這些方法都沒有用,最后只有改源碼了褐筛。
下載相應(yīng)版本的源碼解壓类少,找到NativeIO.java文件。將它加入到工程中去渔扎,如下圖
修改NativeIO.java
最后重新執(zhí)行程序就正常了
Windows7出現(xiàn)這個(gè)問題
需要在你的 Module下創(chuàng)建一個(gè)名為org.apache.hadoop.io.nativeio的包
在這個(gè)包下創(chuàng)建一個(gè)Class 名為 NativeIO
然后粘貼如下代碼
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.nativeio;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.PerformanceAdvisory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import sun.misc.Unsafe;
import com.google.common.annotations.VisibleForTesting;
/**
* JNI wrappers for various native IO-related calls not available in Java.
* These functions should generally be used alongside a fallback to another
* more portable mechanism.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NativeIO {
public static class POSIX {
// Flags for open() call from bits/fcntl.h
public static final int O_RDONLY = 00;
public static final int O_WRONLY = 01;
public static final int O_RDWR = 02;
public static final int O_CREAT = 0100;
public static final int O_EXCL = 0200;
public static final int O_NOCTTY = 0400;
public static final int O_TRUNC = 01000;
public static final int O_APPEND = 02000;
public static final int O_NONBLOCK = 04000;
public static final int O_SYNC = 010000;
public static final int O_ASYNC = 020000;
public static final int O_FSYNC = O_SYNC;
public static final int O_NDELAY = O_NONBLOCK;
// Flags for posix_fadvise() from bits/fcntl.h
/* No further special treatment. */
public static final int POSIX_FADV_NORMAL = 0;
/* Expect random page references. */
public static final int POSIX_FADV_RANDOM = 1;
/* Expect sequential page references. */
public static final int POSIX_FADV_SEQUENTIAL = 2;
/* Will need these pages. */
public static final int POSIX_FADV_WILLNEED = 3;
/* Don't need these pages. */
public static final int POSIX_FADV_DONTNEED = 4;
/* Data will be accessed once. */
public static final int POSIX_FADV_NOREUSE = 5;
/* Wait upon writeout of all pages
in the range before performing the
write. */
public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
/* Initiate writeout of all those
dirty pages in the range which are
not presently under writeback. */
public static final int SYNC_FILE_RANGE_WRITE = 2;
/* Wait upon writeout of all pages in
the range after performing the
write. */
public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
private static final Log LOG = LogFactory.getLog(NativeIO.class);
private static boolean nativeLoaded = false;
private static boolean fadvisePossible = true;
private static boolean syncFileRangePossible = true;
static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY =
"hadoop.workaround.non.threadsafe.getpwuid";
static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = true;
private static long cacheTimeout = -1;
private static CacheManipulator cacheManipulator = new CacheManipulator();
public static CacheManipulator getCacheManipulator() {
return cacheManipulator;
}
public static void setCacheManipulator(CacheManipulator cacheManipulator) {
POSIX.cacheManipulator = cacheManipulator;
}
/**
* Used to manipulate the operating system cache.
*/
@VisibleForTesting
public static class CacheManipulator {
public void mlock(String identifier, ByteBuffer buffer,
long len) throws IOException {
POSIX.mlock(buffer, len);
}
public long getMemlockLimit() {
return NativeIO.getMemlockLimit();
}
public long getOperatingSystemPageSize() {
return NativeIO.getOperatingSystemPageSize();
}
public void posixFadviseIfPossible(String identifier,
FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException {
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
len, flags);
}
public boolean verifyCanMlock() {
return NativeIO.isAvailable();
}
}
/**
* A CacheManipulator used for testing which does not actually call mlock.
* This allows many tests to be run even when the operating system does not
* allow mlock, or only allows limited mlocking.
*/
@VisibleForTesting
public static class NoMlockCacheManipulator extends CacheManipulator {
public void mlock(String identifier, ByteBuffer buffer,
long len) throws IOException {
LOG.info("mlocking " + identifier);
}
public long getMemlockLimit() {
return 1125899906842624L;
}
public long getOperatingSystemPageSize() {
return 4096;
}
public boolean verifyCanMlock() {
return true;
}
}
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
Configuration conf = new Configuration();
workaroundNonThreadSafePasswdCalls = conf.getBoolean(
WORKAROUND_NON_THREADSAFE_CALLS_KEY,
WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT);
initNative();
nativeLoaded = true;
cacheTimeout = conf.getLong(
CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_KEY,
CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT) *
1000;
LOG.debug("Initialized cache for IDs to User/Group mapping with a " +
" cache timeout of " + cacheTimeout/1000 + " seconds.");
} catch (Throwable t) {
// This can happen if the user has an older version of libhadoop.so
// installed - in this case we can continue without native IO
// after warning
PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", t);
}
}
}
/**
* Return true if the JNI-based native IO extensions are available.
*/
public static boolean isAvailable() {
return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
}
private static void assertCodeLoaded() throws IOException {
if (!isAvailable()) {
throw new IOException("NativeIO was not loaded");
}
}
/** Wrapper around open(2) */
public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
/** Wrapper around fstat(2) */
private static native Stat fstat(FileDescriptor fd) throws IOException;
/** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */
private static native void chmodImpl(String path, int mode) throws IOException;
public static void chmod(String path, int mode) throws IOException {
if (!Shell.WINDOWS) {
chmodImpl(path, mode);
} else {
try {
chmodImpl(path, mode);
} catch (NativeIOException nioe) {
if (nioe.getErrorCode() == 3) {
throw new NativeIOException("No such file or directory",
Errno.ENOENT);
} else {
LOG.warn(String.format("NativeIO.chmod error (%d): %s",
nioe.getErrorCode(), nioe.getMessage()));
throw new NativeIOException("Unknown error", Errno.UNKNOWN);
}
}
}
}
/** Wrapper around posix_fadvise(2) */
static native void posix_fadvise(
FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
/** Wrapper around sync_file_range(2) */
static native void sync_file_range(
FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
/**
* Call posix_fadvise on the given file descriptor. See the manpage
* for this syscall for more information. On systems where this
* call is not available, does nothing.
*
* @throws NativeIOException if there is an error with the syscall
*/
static void posixFadviseIfPossible(String identifier,
FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException {
if (nativeLoaded && fadvisePossible) {
try {
posix_fadvise(fd, offset, len, flags);
} catch (UnsupportedOperationException uoe) {
fadvisePossible = false;
} catch (UnsatisfiedLinkError ule) {
fadvisePossible = false;
}
}
}
/**
* Call sync_file_range on the given file descriptor. See the manpage
* for this syscall for more information. On systems where this
* call is not available, does nothing.
*
* @throws NativeIOException if there is an error with the syscall
*/
public static void syncFileRangeIfPossible(
FileDescriptor fd, long offset, long nbytes, int flags)
throws NativeIOException {
if (nativeLoaded && syncFileRangePossible) {
try {
sync_file_range(fd, offset, nbytes, flags);
} catch (UnsupportedOperationException uoe) {
syncFileRangePossible = false;
} catch (UnsatisfiedLinkError ule) {
syncFileRangePossible = false;
}
}
}
static native void mlock_native(
ByteBuffer buffer, long len) throws NativeIOException;
/**
* Locks the provided direct ByteBuffer into memory, preventing it from
* swapping out. After a buffer is locked, future accesses will not incur
* a page fault.
*
* See the mlock(2) man page for more information.
*
* @throws NativeIOException
*/
static void mlock(ByteBuffer buffer, long len)
throws IOException {
assertCodeLoaded();
if (!buffer.isDirect()) {
throw new IOException("Cannot mlock a non-direct ByteBuffer");
}
mlock_native(buffer, len);
}
/**
* Unmaps the block from memory. See munmap(2).
*
* There isn't any portable way to unmap a memory region in Java.
* So we use the sun.nio method here.
* Note that unmapping a memory region could cause crashes if code
* continues to reference the unmapped code. However, if we don't
* manually unmap the memory, we are dependent on the finalizer to
* do it, and we have no idea when the finalizer will run.
*
* @param buffer The buffer to unmap.
*/
public static void munmap(MappedByteBuffer buffer) {
if (buffer instanceof sun.nio.ch.DirectBuffer) {
sun.misc.Cleaner cleaner =
((sun.nio.ch.DirectBuffer)buffer).cleaner();
cleaner.clean();
}
}
/** Linux only methods used for getOwner() implementation */
private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException;
private static native String getUserName(long uid) throws IOException;
/**
* Result type of the fstat call
*/
public static class Stat {
private int ownerId, groupId;
private String owner, group;
private int mode;
// Mode constants
public static final int S_IFMT = 0170000; /* type of file */
public static final int S_IFIFO = 0010000; /* named pipe (fifo) */
public static final int S_IFCHR = 0020000; /* character special */
public static final int S_IFDIR = 0040000; /* directory */
public static final int S_IFBLK = 0060000; /* block special */
public static final int S_IFREG = 0100000; /* regular */
public static final int S_IFLNK = 0120000; /* symbolic link */
public static final int S_IFSOCK = 0140000; /* socket */
public static final int S_IFWHT = 0160000; /* whiteout */
public static final int S_ISUID = 0004000; /* set user id on execution */
public static final int S_ISGID = 0002000; /* set group id on execution */
public static final int S_ISVTX = 0001000; /* save swapped text even after use */
public static final int S_IRUSR = 0000400; /* read permission, owner */
public static final int S_IWUSR = 0000200; /* write permission, owner */
public static final int S_IXUSR = 0000100; /* execute/search permission, owner */
Stat(int ownerId, int groupId, int mode) {
this.ownerId = ownerId;
this.groupId = groupId;
this.mode = mode;
}
Stat(String owner, String group, int mode) {
if (!Shell.WINDOWS) {
this.owner = owner;
} else {
this.owner = stripDomain(owner);
}
if (!Shell.WINDOWS) {
this.group = group;
} else {
this.group = stripDomain(group);
}
this.mode = mode;
}
@Override
public String toString() {
return "Stat(owner='" + owner + "', group='" + group + "'" +
", mode=" + mode + ")";
}
public String getOwner() {
return owner;
}
public String getGroup() {
return group;
}
public int getMode() {
return mode;
}
}
/**
* Returns the file stat for a file descriptor.
*
* @param fd file descriptor.
* @return the file descriptor file stat.
* @throws IOException thrown if there was an IO error while obtaining the file stat.
*/
public static Stat getFstat(FileDescriptor fd) throws IOException {
Stat stat = null;
if (!Shell.WINDOWS) {
stat = fstat(fd);
stat.owner = getName(IdCache.USER, stat.ownerId);
stat.group = getName(IdCache.GROUP, stat.groupId);
} else {
try {
stat = fstat(fd);
} catch (NativeIOException nioe) {
if (nioe.getErrorCode() == 6) {
throw new NativeIOException("The handle is invalid.",
Errno.EBADF);
} else {
LOG.warn(String.format("NativeIO.getFstat error (%d): %s",
nioe.getErrorCode(), nioe.getMessage()));
throw new NativeIOException("Unknown error", Errno.UNKNOWN);
}
}
}
return stat;
}
private static String getName(IdCache domain, int id) throws IOException {
Map<Integer, CachedName> idNameCache = (domain == IdCache.USER)
? USER_ID_NAME_CACHE : GROUP_ID_NAME_CACHE;
String name;
CachedName cachedName = idNameCache.get(id);
long now = System.currentTimeMillis();
if (cachedName != null && (cachedName.timestamp + cacheTimeout) > now) {
name = cachedName.name;
} else {
name = (domain == IdCache.USER) ? getUserName(id) : getGroupName(id);
if (LOG.isDebugEnabled()) {
String type = (domain == IdCache.USER) ? "UserName" : "GroupName";
LOG.debug("Got " + type + " " + name + " for ID " + id +
" from the native implementation");
}
cachedName = new CachedName(name, now);
idNameCache.put(id, cachedName);
}
return name;
}
static native String getUserName(int uid) throws IOException;
static native String getGroupName(int uid) throws IOException;
private static class CachedName {
final long timestamp;
final String name;
public CachedName(String name, long timestamp) {
this.name = name;
this.timestamp = timestamp;
}
}
private static final Map<Integer, CachedName> USER_ID_NAME_CACHE =
new ConcurrentHashMap<Integer, CachedName>();
private static final Map<Integer, CachedName> GROUP_ID_NAME_CACHE =
new ConcurrentHashMap<Integer, CachedName>();
private enum IdCache { USER, GROUP }
public final static int MMAP_PROT_READ = 0x1;
public final static int MMAP_PROT_WRITE = 0x2;
public final static int MMAP_PROT_EXEC = 0x4;
public static native long mmap(FileDescriptor fd, int prot,
boolean shared, long length) throws IOException;
public static native void munmap(long addr, long length)
throws IOException;
}
private static boolean workaroundNonThreadSafePasswdCalls = false;
public static class Windows {
// Flags for CreateFile() call on Windows
public static final long GENERIC_READ = 0x80000000L;
public static final long GENERIC_WRITE = 0x40000000L;
public static final long FILE_SHARE_READ = 0x00000001L;
public static final long FILE_SHARE_WRITE = 0x00000002L;
public static final long FILE_SHARE_DELETE = 0x00000004L;
public static final long CREATE_NEW = 1;
public static final long CREATE_ALWAYS = 2;
public static final long OPEN_EXISTING = 3;
public static final long OPEN_ALWAYS = 4;
public static final long TRUNCATE_EXISTING = 5;
public static final long FILE_BEGIN = 0;
public static final long FILE_CURRENT = 1;
public static final long FILE_END = 2;
public static final long FILE_ATTRIBUTE_NORMAL = 0x00000080L;
/**
* Create a directory with permissions set to the specified mode. By setting
* permissions at creation time, we avoid issues related to the user lacking
* WRITE_DAC rights on subsequent chmod calls. One example where this can
* occur is writing to an SMB share where the user does not have Full Control
* rights, and therefore WRITE_DAC is denied.
*
* @param path directory to create
* @param mode permissions of new directory
* @throws IOException if there is an I/O error
*/
public static void createDirectoryWithMode(File path, int mode)
throws IOException {
createDirectoryWithMode0(path.getAbsolutePath(), mode);
}
/** Wrapper around CreateDirectory() on Windows */
private static native void createDirectoryWithMode0(String path, int mode)
throws NativeIOException;
/** Wrapper around CreateFile() on Windows */
public static native FileDescriptor createFile(String path,
long desiredAccess, long shareMode, long creationDisposition)
throws IOException;
/**
* Create a file for write with permissions set to the specified mode. By
* setting permissions at creation time, we avoid issues related to the user
* lacking WRITE_DAC rights on subsequent chmod calls. One example where
* this can occur is writing to an SMB share where the user does not have
* Full Control rights, and therefore WRITE_DAC is denied.
*
* This method mimics the semantics implemented by the JDK in
* {@link java.io.FileOutputStream}. The file is opened for truncate or
* append, the sharing mode allows other readers and writers, and paths
* longer than MAX_PATH are supported. (See io_util_md.c in the JDK.)
*
* @param path file to create
* @param append if true, then open file for append
* @param mode permissions of new directory
* @return FileOutputStream of opened file
* @throws IOException if there is an I/O error
*/
public static FileOutputStream createFileOutputStreamWithMode(File path,
boolean append, int mode) throws IOException {
long desiredAccess = GENERIC_WRITE;
long shareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
long creationDisposition = append ? OPEN_ALWAYS : CREATE_ALWAYS;
return new FileOutputStream(createFileWithMode0(path.getAbsolutePath(),
desiredAccess, shareMode, creationDisposition, mode));
}
/** Wrapper around CreateFile() with security descriptor on Windows */
private static native FileDescriptor createFileWithMode0(String path,
long desiredAccess, long shareMode, long creationDisposition, int mode)
throws NativeIOException;
/** Wrapper around SetFilePointer() on Windows */
public static native long setFilePointer(FileDescriptor fd,
long distanceToMove, long moveMethod) throws IOException;
/** Windows only methods used for getOwner() implementation */
private static native String getOwner(FileDescriptor fd) throws IOException;
/** Supported list of Windows access right flags */
public static enum AccessRight {
ACCESS_READ (0x0001), // FILE_READ_DATA
ACCESS_WRITE (0x0002), // FILE_WRITE_DATA
ACCESS_EXECUTE (0x0020); // FILE_EXECUTE
private final int accessRight;
AccessRight(int access) {
accessRight = access;
}
public int accessRight() {
return accessRight;
}
};
/** Windows only method used to check if the current process has requested
* access rights on the given path. */
private static native boolean access0(String path, int requestedAccess);
/**
* Checks whether the current process has desired access rights on
* the given path.
*
* Longer term this native function can be substituted with JDK7
* function Files#isReadable, isWritable, isExecutable.
*
* @param path input path
* @param desiredAccess ACCESS_READ, ACCESS_WRITE or ACCESS_EXECUTE
* @return true if access is allowed
* @throws IOException I/O exception on error
*/
public static boolean access(String path, AccessRight desiredAccess)
throws IOException {
return true;
// return access0(path, desiredAccess.accessRight());
}
/**
* Extends both the minimum and maximum working set size of the current
* process. This method gets the current minimum and maximum working set
* size, adds the requested amount to each and then sets the minimum and
* maximum working set size to the new values. Controlling the working set
* size of the process also controls the amount of memory it can lock.
*
* @param delta amount to increment minimum and maximum working set size
* @throws IOException for any error
* @see POSIX#mlock(ByteBuffer, long)
*/
public static native void extendWorkingSetSize(long delta) throws IOException;
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
initNative();
nativeLoaded = true;
} catch (Throwable t) {
// This can happen if the user has an older version of libhadoop.so
// installed - in this case we can continue without native IO
// after warning
PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", t);
}
}
}
}
private static final Log LOG = LogFactory.getLog(NativeIO.class);
private static boolean nativeLoaded = false;
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
initNative();
nativeLoaded = true;
} catch (Throwable t) {
// This can happen if the user has an older version of libhadoop.so
// installed - in this case we can continue without native IO
// after warning
PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", t);
}
}
}
/**
* Return true if the JNI-based native IO extensions are available.
*/
public static boolean isAvailable() {
return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
}
/** Initialize the JNI method ID and class ID cache */
private static native void initNative();
/**
* Get the maximum number of bytes that can be locked into memory at any
* given point.
*
* @return 0 if no bytes can be locked into memory;
* Long.MAX_VALUE if there is no limit;
* The number of bytes that can be locked into memory otherwise.
*/
static long getMemlockLimit() {
return isAvailable() ? getMemlockLimit0() : 0;
}
private static native long getMemlockLimit0();
/**
* @return the operating system's page size.
*/
static long getOperatingSystemPageSize() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe)f.get(null);
return unsafe.pageSize();
} catch (Throwable e) {
LOG.warn("Unable to get operating system page size. Guessing 4096.", e);
return 4096;
}
}
private static class CachedUid {
final long timestamp;
final String username;
public CachedUid(String username, long timestamp) {
this.timestamp = timestamp;
this.username = username;
}
}
private static final Map<Long, CachedUid> uidCache =
new ConcurrentHashMap<Long, CachedUid>();
private static long cacheTimeout;
private static boolean initialized = false;
/**
* The Windows logon name has two part, NetBIOS domain name and
* user account name, of the format DOMAIN\UserName. This method
* will remove the domain part of the full logon name.
*
* @param Fthe full principal name containing the domain
* @return name with domain removed
*/
private static String stripDomain(String name) {
int i = name.indexOf('\\');
if (i != -1)
name = name.substring(i + 1);
return name;
}
public static String getOwner(FileDescriptor fd) throws IOException {
ensureInitialized();
if (Shell.WINDOWS) {
String owner = Windows.getOwner(fd);
owner = stripDomain(owner);
return owner;
} else {
long uid = POSIX.getUIDforFDOwnerforOwner(fd);
CachedUid cUid = uidCache.get(uid);
long now = System.currentTimeMillis();
if (cUid != null && (cUid.timestamp + cacheTimeout) > now) {
return cUid.username;
}
String user = POSIX.getUserName(uid);
LOG.info("Got UserName " + user + " for UID " + uid
+ " from the native implementation");
cUid = new CachedUid(user, now);
uidCache.put(uid, cUid);
return user;
}
}
/**
* Create a FileInputStream that shares delete permission on the
* file opened, i.e. other process can delete the file the
* FileInputStream is reading. Only Windows implementation uses
* the native interface.
*/
public static FileInputStream getShareDeleteFileInputStream(File f)
throws IOException {
if (!Shell.WINDOWS) {
// On Linux the default FileInputStream shares delete permission
// on the file opened.
//
return new FileInputStream(f);
} else {
// Use Windows native interface to create a FileInputStream that
// shares delete permission on the file opened.
//
FileDescriptor fd = Windows.createFile(
f.getAbsolutePath(),
Windows.GENERIC_READ,
Windows.FILE_SHARE_READ |
Windows.FILE_SHARE_WRITE |
Windows.FILE_SHARE_DELETE,
Windows.OPEN_EXISTING);
return new FileInputStream(fd);
}
}
/**
* Create a FileInputStream that shares delete permission on the
* file opened at a given offset, i.e. other process can delete
* the file the FileInputStream is reading. Only Windows implementation
* uses the native interface.
*/
public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
throws IOException {
if (!Shell.WINDOWS) {
RandomAccessFile rf = new RandomAccessFile(f, "r");
if (seekOffset > 0) {
rf.seek(seekOffset);
}
return new FileInputStream(rf.getFD());
} else {
// Use Windows native interface to create a FileInputStream that
// shares delete permission on the file opened, and set it to the
// given offset.
//
FileDescriptor fd = NativeIO.Windows.createFile(
f.getAbsolutePath(),
NativeIO.Windows.GENERIC_READ,
NativeIO.Windows.FILE_SHARE_READ |
NativeIO.Windows.FILE_SHARE_WRITE |
NativeIO.Windows.FILE_SHARE_DELETE,
NativeIO.Windows.OPEN_EXISTING);
if (seekOffset > 0)
NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
return new FileInputStream(fd);
}
}
/**
* Create the specified File for write access, ensuring that it does not exist.
* @param f the file that we want to create
* @param permissions we want to have on the file (if security is enabled)
*
* @throws AlreadyExistsException if the file already exists
* @throws IOException if any other error occurred
*/
public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions)
throws IOException {
if (!Shell.WINDOWS) {
// Use the native wrapper around open(2)
try {
FileDescriptor fd = NativeIO.POSIX.open(f.getAbsolutePath(),
NativeIO.POSIX.O_WRONLY | NativeIO.POSIX.O_CREAT
| NativeIO.POSIX.O_EXCL, permissions);
return new FileOutputStream(fd);
} catch (NativeIOException nioe) {
if (nioe.getErrno() == Errno.EEXIST) {
throw new AlreadyExistsException(nioe);
}
throw nioe;
}
} else {
// Use the Windows native APIs to create equivalent FileOutputStream
try {
FileDescriptor fd = NativeIO.Windows.createFile(f.getCanonicalPath(),
NativeIO.Windows.GENERIC_WRITE,
NativeIO.Windows.FILE_SHARE_DELETE
| NativeIO.Windows.FILE_SHARE_READ
| NativeIO.Windows.FILE_SHARE_WRITE,
NativeIO.Windows.CREATE_NEW);
NativeIO.POSIX.chmod(f.getCanonicalPath(), permissions);
return new FileOutputStream(fd);
} catch (NativeIOException nioe) {
if (nioe.getErrorCode() == 80) {
// ERROR_FILE_EXISTS
// 80 (0x50)
// The file exists
throw new AlreadyExistsException(nioe);
}
throw nioe;
}
}
}
private synchronized static void ensureInitialized() {
if (!initialized) {
cacheTimeout =
new Configuration().getLong("hadoop.security.uid.cache.secs",
4*60*60) * 1000;
LOG.info("Initialized cache for UID to User mapping with a cache" +
" timeout of " + cacheTimeout/1000 + " seconds.");
initialized = true;
}
}
/**
* A version of renameTo that throws a descriptive exception when it fails.
*
* @param src The source path
* @param dst The destination path
*
* @throws NativeIOException On failure.
*/
public static void renameTo(File src, File dst)
throws IOException {
if (!nativeLoaded) {
if (!src.renameTo(dst)) {
throw new IOException("renameTo(src=" + src + ", dst=" +
dst + ") failed.");
}
} else {
renameTo0(src.getAbsolutePath(), dst.getAbsolutePath());
}
}
public static void link(File src, File dst) throws IOException {
if (!nativeLoaded) {
HardLink.createHardLink(src, dst);
} else {
link0(src.getAbsolutePath(), dst.getAbsolutePath());
}
}
/**
* A version of renameTo that throws a descriptive exception when it fails.
*
* @param src The source path
* @param dst The destination path
*
* @throws NativeIOException On failure.
*/
private static native void renameTo0(String src, String dst)
throws NativeIOException;
private static native void link0(String src, String dst)
throws NativeIOException;
/**
* Unbuffered file copy from src to dst without tainting OS buffer cache
*
* In POSIX platform:
* It uses FileChannel#transferTo() which internally attempts
* unbuffered IO on OS with native sendfile64() support and falls back to
* buffered IO otherwise.
*
* It minimizes the number of FileChannel#transferTo call by passing the the
* src file size directly instead of a smaller size as the 3rd parameter.
* This saves the number of sendfile64() system call when native sendfile64()
* is supported. In the two fall back cases where sendfile is not supported,
* FileChannle#transferTo already has its own batching of size 8 MB and 8 KB,
* respectively.
*
* In Windows Platform:
* It uses its own native wrapper of CopyFileEx with COPY_FILE_NO_BUFFERING
* flag, which is supported on Windows Server 2008 and above.
*
* Ideally, we should use FileChannel#transferTo() across both POSIX and Windows
* platform. Unfortunately, the wrapper(Java_sun_nio_ch_FileChannelImpl_transferTo0)
* used by FileChannel#transferTo for unbuffered IO is not implemented on Windows.
* Based on OpenJDK 6/7/8 source code, Java_sun_nio_ch_FileChannelImpl_transferTo0
* on Windows simply returns IOS_UNSUPPORTED.
*
* Note: This simple native wrapper does minimal parameter checking before copy and
* consistency check (e.g., size) after copy.
* It is recommended to use wrapper function like
* the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs with pre/post copy
* checks.
*
* @param src The source path
* @param dst The destination path
* @throws IOException
*/
public static void copyFileUnbuffered(File src, File dst) throws IOException {
if (nativeLoaded && Shell.WINDOWS) {
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
} else {
FileInputStream fis = null;
FileOutputStream fos = null;
FileChannel input = null;
FileChannel output = null;
try {
fis = new FileInputStream(src);
fos = new FileOutputStream(dst);
input = fis.getChannel();
output = fos.getChannel();
long remaining = input.size();
long position = 0;
long transferred = 0;
while (remaining > 0) {
transferred = input.transferTo(position, remaining, output);
remaining -= transferred;
position += transferred;
}
} finally {
IOUtils.cleanup(LOG, output);
IOUtils.cleanup(LOG, fos);
IOUtils.cleanup(LOG, input);
IOUtils.cleanup(LOG, fis);
}
}
}
private static native void copyFileUnbuffered0(String src, String dst)
throws NativeIOException;
}
然后應(yīng)該就可以了
將程序放到hadoop集群運(yùn)行
1.打包插件
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!-- 指定 mainclass硫狞,該配置將最終體現(xiàn)在jar包manifest文件中 -->
<mainClass>com.roadom.WordcountDriver</mainClass>
</manifest>
</archive>
<classesDirectory>
</classesDirectory>
</configuration>
</plugin>
</plugins>
1.打jar包
2.將jar包(不包括hadoop lib)上傳到hadoop集群
D:\hadoopmr\out\artifacts\hadoopmr_jar\hadoopmr.jar
3.執(zhí)行命令運(yùn)行mr作業(yè)
./hadoop jar hadoopmr.jar com.neusoft.WordCount /wc.input /out42
可能遇到的問題
hadoop.dll 和 winutils.exe 添加到hadoop解壓目錄
如果雙擊winutils.exe彈出錯(cuò)誤,需要安裝
DirectX_Repair_3.7_Enhanced_XiaZaiBa.zip
更新電腦的dll