hadoop學(xué)習(xí)筆記(六)WordCount

我們編寫MapReduce程序,wordCount荐糜。
首先在maven中添加依賴

   <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.7.5</version>
        </dependency>

然后是我們的代碼
MapReduceDemo類

public class MapReduceDemo {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //設(shè)置user為namenode的hostname沒有修改客戶端user名的設(shè)置
        System.setProperty("HADOOP_USER_NAME","gaojz");
        Configuration conf = new Configuration();   conf.set("fs.defaultFS","hdfs://node1:8020");      conf.set("yarn.resourcemanager.hostname","master");
        Job job = Job.getInstance(conf);
        job.setJarByClass(MapReduceDemo.class);
        job.setMapperClass(WCMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(WCReduce.class);
//要執(zhí)行的文件地址逮刨,需要先上傳到hdfs該路徑下
        FileInputFormat.addInputPath(job,new Path("/wc/input/wc"));
//文件輸出位置
        Path outpath = new Path("/wc/output1/wcout");
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(outpath))
            fs.delete(outpath,true);
        FileOutputFormat.setOutputPath(job,outpath);

        boolean flag = job.waitForCompletion(true);
        if(flag)
            System.out.println("job success");
    }
}

WCMapper類

public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String str = value.toString();
        String[] strs = StringUtils.split(str,' ');
        for(String s:strs){
            context.write(new Text(s),new IntWritable(1));
        }
    }
}

WCReducer類

public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable i:values){
            sum += i.get();
        }
        context.write(key,new IntWritable(sum));
    }
}

我們程序有三種運(yùn)行模式到千,首先第一種為本地測(cè)試環(huán)境運(yùn)行嘱巾。
需要在windows下配置hadoop:
將下載的hadoop解壓到某個(gè)目錄下,然后配置環(huán)境變量
添加HADOOP_HOME為hadoop解壓目錄锰蓬,然后再path中添加%HADOOP_HOME%\bin;%HADOOP_HOME%\sbin;
下載winutils.exe添加到HADOOP_HOME/bin下,修改hadoop源碼眯漩,位置:
org.apache.hadoop.io.nativeio.NativeIO
將該類中access方法的返回值改為true



NativeIO源碼

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.io.nativeio;

import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
import org.apache.hadoop.io.nativeio.Errno;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.PerformanceAdvisory;
import org.apache.hadoop.util.Shell;
import sun.misc.Cleaner;
import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;

@Private
@Unstable
public class NativeIO {
    private static boolean workaroundNonThreadSafePasswdCalls = false;
    private static final Log LOG = LogFactory.getLog(NativeIO.class);
    private static boolean nativeLoaded = false;
    private static final Map<Long, NativeIO.CachedUid> uidCache;
    private static long cacheTimeout;
    private static boolean initialized;

    public NativeIO() {
    }

    public static boolean isAvailable() {
        return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
    }

    private static native void initNative();

    static long getMemlockLimit() {
        return isAvailable()?getMemlockLimit0():0L;
    }

    private static native long getMemlockLimit0();

    static long getOperatingSystemPageSize() {
        try {
            Field e = Unsafe.class.getDeclaredField("theUnsafe");
            e.setAccessible(true);
            Unsafe unsafe = (Unsafe)e.get((Object)null);
            return (long)unsafe.pageSize();
        } catch (Throwable var2) {
            LOG.warn("Unable to get operating system page size.  Guessing 4096.", var2);
            return 4096L;
        }
    }

    private static String stripDomain(String name) {
        int i = name.indexOf(92);
        if(i != -1) {
            name = name.substring(i + 1);
        }

        return name;
    }

    public static String getOwner(FileDescriptor fd) throws IOException {
        ensureInitialized();
        if(Shell.WINDOWS) {
            String uid1 = NativeIO.Windows.getOwner(fd);
            uid1 = stripDomain(uid1);
            return uid1;
        } else {
            long uid = NativeIO.POSIX.getUIDforFDOwnerforOwner(fd);
            NativeIO.CachedUid cUid = (NativeIO.CachedUid)uidCache.get(Long.valueOf(uid));
            long now = System.currentTimeMillis();
            if(cUid != null && cUid.timestamp + cacheTimeout > now) {
                return cUid.username;
            } else {
                String user = NativeIO.POSIX.getUserName(uid);
                LOG.info("Got UserName " + user + " for UID " + uid + " from the native implementation");
                cUid = new NativeIO.CachedUid(user, now);
                uidCache.put(Long.valueOf(uid), cUid);
                return user;
            }
        }
    }

    public static FileInputStream getShareDeleteFileInputStream(File f) throws IOException {
        if(!Shell.WINDOWS) {
            return new FileInputStream(f);
        } else {
            FileDescriptor fd = NativeIO.Windows.createFile(f.getAbsolutePath(), 2147483648L, 7L, 3L);
            return new FileInputStream(fd);
        }
    }

    public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset) throws IOException {
        if(!Shell.WINDOWS) {
            RandomAccessFile fd1 = new RandomAccessFile(f, "r");
            if(seekOffset > 0L) {
                fd1.seek(seekOffset);
            }

            return new FileInputStream(fd1.getFD());
        } else {
            FileDescriptor fd = NativeIO.Windows.createFile(f.getAbsolutePath(), 2147483648L, 7L, 3L);
            if(seekOffset > 0L) {
                NativeIO.Windows.setFilePointer(fd, seekOffset, 0L);
            }

            return new FileInputStream(fd);
        }
    }

    public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions) throws IOException {
        FileDescriptor nioe;
        if(!Shell.WINDOWS) {
            try {
                nioe = NativeIO.POSIX.open(f.getAbsolutePath(), 193, permissions);
                return new FileOutputStream(nioe);
            } catch (NativeIOException var3) {
                if(var3.getErrno() == Errno.EEXIST) {
                    throw new AlreadyExistsException(var3);
                } else {
                    throw var3;
                }
            }
        } else {
            try {
                nioe = NativeIO.Windows.createFile(f.getCanonicalPath(), 1073741824L, 7L, 1L);
                NativeIO.POSIX.chmod(f.getCanonicalPath(), permissions);
                return new FileOutputStream(nioe);
            } catch (NativeIOException var4) {
                if(var4.getErrorCode() == 80L) {
                    throw new AlreadyExistsException(var4);
                } else {
                    throw var4;
                }
            }
        }
    }

    private static synchronized void ensureInitialized() {
        if(!initialized) {
            cacheTimeout = (new Configuration()).getLong("hadoop.security.uid.cache.secs", 14400L) * 1000L;
            LOG.info("Initialized cache for UID to User mapping with a cache timeout of " + cacheTimeout / 1000L + " seconds.");
            initialized = true;
        }

    }

    public static void renameTo(File src, File dst) throws IOException {
        if(!nativeLoaded) {
            if(!src.renameTo(dst)) {
                throw new IOException("renameTo(src=" + src + ", dst=" + dst + ") failed.");
            }
        } else {
            renameTo0(src.getAbsolutePath(), dst.getAbsolutePath());
        }

    }

    public static void link(File src, File dst) throws IOException {
        if(!nativeLoaded) {
            HardLink.createHardLink(src, dst);
        } else {
            link0(src.getAbsolutePath(), dst.getAbsolutePath());
        }

    }

    private static native void renameTo0(String var0, String var1) throws NativeIOException;

    private static native void link0(String var0, String var1) throws NativeIOException;

    public static void copyFileUnbuffered(File src, File dst) throws IOException {
        if(nativeLoaded && Shell.WINDOWS) {
            copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
        } else {
            FileInputStream fis = null;
            FileOutputStream fos = null;
            FileChannel input = null;
            FileChannel output = null;

            try {
                fis = new FileInputStream(src);
                fos = new FileOutputStream(dst);
                input = fis.getChannel();
                output = fos.getChannel();
                long remaining = input.size();
                long position = 0L;

                for(long transferred = 0L; remaining > 0L; position += transferred) {
                    transferred = input.transferTo(position, remaining, output);
                    remaining -= transferred;
                }
            } finally {
                IOUtils.cleanup(LOG, new Closeable[]{output});
                IOUtils.cleanup(LOG, new Closeable[]{fos});
                IOUtils.cleanup(LOG, new Closeable[]{input});
                IOUtils.cleanup(LOG, new Closeable[]{fis});
            }
        }

    }

    private static native void copyFileUnbuffered0(String var0, String var1) throws NativeIOException;

    static {
        if(NativeCodeLoader.isNativeCodeLoaded()) {
            try {
                initNative();
                nativeLoaded = true;
            } catch (Throwable var1) {
                PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", var1);
            }
        }

        uidCache = new ConcurrentHashMap();
        initialized = false;
    }

    private static class CachedUid {
        final long timestamp;
        final String username;

        public CachedUid(String username, long timestamp) {
            this.timestamp = timestamp;
            this.username = username;
        }
    }

    public static class Windows {
        public static final long GENERIC_READ = 2147483648L;
        public static final long GENERIC_WRITE = 1073741824L;
        public static final long FILE_SHARE_READ = 1L;
        public static final long FILE_SHARE_WRITE = 2L;
        public static final long FILE_SHARE_DELETE = 4L;
        public static final long CREATE_NEW = 1L;
        public static final long CREATE_ALWAYS = 2L;
        public static final long OPEN_EXISTING = 3L;
        public static final long OPEN_ALWAYS = 4L;
        public static final long TRUNCATE_EXISTING = 5L;
        public static final long FILE_BEGIN = 0L;
        public static final long FILE_CURRENT = 1L;
        public static final long FILE_END = 2L;
        public static final long FILE_ATTRIBUTE_NORMAL = 128L;

        public Windows() {
        }

        public static void createDirectoryWithMode(File path, int mode) throws IOException {
            createDirectoryWithMode0(path.getAbsolutePath(), mode);
        }

        private static native void createDirectoryWithMode0(String var0, int var1) throws NativeIOException;

        public static native FileDescriptor createFile(String var0, long var1, long var3, long var5) throws IOException;

        public static FileOutputStream createFileOutputStreamWithMode(File path, boolean append, int mode) throws IOException {
            long desiredAccess = 1073741824L;
            long shareMode = 3L;
            long creationDisposition = append?4L:2L;
            return new FileOutputStream(createFileWithMode0(path.getAbsolutePath(), desiredAccess, shareMode, creationDisposition, mode));
        }

        private static native FileDescriptor createFileWithMode0(String var0, long var1, long var3, long var5, int var7) throws NativeIOException;

        public static native long setFilePointer(FileDescriptor var0, long var1, long var3) throws IOException;

        private static native String getOwner(FileDescriptor var0) throws IOException;

        private static native boolean access0(String var0, int var1);

        public static boolean access(String path, NativeIO.Windows.AccessRight desiredAccess) throws IOException {
//            return access0(path, desiredAccess.accessRight());
            return true;
        }

        public static native void extendWorkingSetSize(long var0) throws IOException;

        static {
            if(NativeCodeLoader.isNativeCodeLoaded()) {
                try {
                    NativeIO.initNative();
                    NativeIO.nativeLoaded = true;
                } catch (Throwable var1) {
                    PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", var1);
                }
            }

        }

        public static enum AccessRight {
            ACCESS_READ(1),
            ACCESS_WRITE(2),
            ACCESS_EXECUTE(32);

            private final int accessRight;

            private AccessRight(int access) {
                this.accessRight = access;
            }

            public int accessRight() {
                return this.accessRight;
            }
        }
    }

    public static class POSIX {
        public static final int O_RDONLY = 0;
        public static final int O_WRONLY = 1;
        public static final int O_RDWR = 2;
        public static final int O_CREAT = 64;
        public static final int O_EXCL = 128;
        public static final int O_NOCTTY = 256;
        public static final int O_TRUNC = 512;
        public static final int O_APPEND = 1024;
        public static final int O_NONBLOCK = 2048;
        public static final int O_SYNC = 4096;
        public static final int O_ASYNC = 8192;
        public static final int O_FSYNC = 4096;
        public static final int O_NDELAY = 2048;
        public static final int POSIX_FADV_NORMAL = 0;
        public static final int POSIX_FADV_RANDOM = 1;
        public static final int POSIX_FADV_SEQUENTIAL = 2;
        public static final int POSIX_FADV_WILLNEED = 3;
        public static final int POSIX_FADV_DONTNEED = 4;
        public static final int POSIX_FADV_NOREUSE = 5;
        public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
        public static final int SYNC_FILE_RANGE_WRITE = 2;
        public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
        private static final Log LOG = LogFactory.getLog(NativeIO.class);
        private static boolean nativeLoaded = false;
        private static boolean fadvisePossible = true;
        private static boolean syncFileRangePossible = true;
        static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY = "hadoop.workaround.non.threadsafe.getpwuid";
        static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = true;
        private static long cacheTimeout = -1L;
        private static NativeIO.POSIX.CacheManipulator cacheManipulator = new NativeIO.POSIX.CacheManipulator();
        private static final Map<Integer, NativeIO.POSIX.CachedName> USER_ID_NAME_CACHE;
        private static final Map<Integer, NativeIO.POSIX.CachedName> GROUP_ID_NAME_CACHE;
        public static final int MMAP_PROT_READ = 1;
        public static final int MMAP_PROT_WRITE = 2;
        public static final int MMAP_PROT_EXEC = 4;

        public POSIX() {
        }

        public static NativeIO.POSIX.CacheManipulator getCacheManipulator() {
            return cacheManipulator;
        }

        public static void setCacheManipulator(NativeIO.POSIX.CacheManipulator cacheManipulator) {
            NativeIO.POSIX.cacheManipulator = cacheManipulator;
        }

        public static boolean isAvailable() {
            return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
        }

        private static void assertCodeLoaded() throws IOException {
            if(!isAvailable()) {
                throw new IOException("NativeIO was not loaded");
            }
        }

        public static native FileDescriptor open(String var0, int var1, int var2) throws IOException;

        private static native NativeIO.POSIX.Stat fstat(FileDescriptor var0) throws IOException;

        private static native void chmodImpl(String var0, int var1) throws IOException;

        public static void chmod(String path, int mode) throws IOException {
            if(!Shell.WINDOWS) {
                chmodImpl(path, mode);
            } else {
                try {
                    chmodImpl(path, mode);
                } catch (NativeIOException var3) {
                    if(var3.getErrorCode() == 3L) {
                        throw new NativeIOException("No such file or directory", Errno.ENOENT);
                    }

                    LOG.warn(String.format("NativeIO.chmod error (%d): %s", new Object[]{Long.valueOf(var3.getErrorCode()), var3.getMessage()}));
                    throw new NativeIOException("Unknown error", Errno.UNKNOWN);
                }
            }

        }

        static native void posix_fadvise(FileDescriptor var0, long var1, long var3, int var5) throws NativeIOException;

        static native void sync_file_range(FileDescriptor var0, long var1, long var3, int var5) throws NativeIOException;

        static void posixFadviseIfPossible(String identifier, FileDescriptor fd, long offset, long len, int flags) throws NativeIOException {
            if(nativeLoaded && fadvisePossible) {
                try {
                    posix_fadvise(fd, offset, len, flags);
                } catch (UnsupportedOperationException var8) {
                    fadvisePossible = false;
                } catch (UnsatisfiedLinkError var9) {
                    fadvisePossible = false;
                }
            }

        }

        public static void syncFileRangeIfPossible(FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException {
            if(nativeLoaded && syncFileRangePossible) {
                try {
                    sync_file_range(fd, offset, nbytes, flags);
                } catch (UnsupportedOperationException var7) {
                    syncFileRangePossible = false;
                } catch (UnsatisfiedLinkError var8) {
                    syncFileRangePossible = false;
                }
            }

        }

        static native void mlock_native(ByteBuffer var0, long var1) throws NativeIOException;

        static void mlock(ByteBuffer buffer, long len) throws IOException {
            assertCodeLoaded();
            if(!buffer.isDirect()) {
                throw new IOException("Cannot mlock a non-direct ByteBuffer");
            } else {
                mlock_native(buffer, len);
            }
        }

        public static void munmap(MappedByteBuffer buffer) {
            if(buffer instanceof DirectBuffer) {
                Cleaner cleaner = ((DirectBuffer)buffer).cleaner();
                cleaner.clean();
            }

        }

        private static native long getUIDforFDOwnerforOwner(FileDescriptor var0) throws IOException;

        private static native String getUserName(long var0) throws IOException;

        public static NativeIO.POSIX.Stat getFstat(FileDescriptor fd) throws IOException {
            NativeIO.POSIX.Stat stat = null;
            if(!Shell.WINDOWS) {
                stat = fstat(fd);
                stat.owner = getName(NativeIO.POSIX.IdCache.USER, stat.ownerId);
                stat.group = getName(NativeIO.POSIX.IdCache.GROUP, stat.groupId);
            } else {
                try {
                    stat = fstat(fd);
                } catch (NativeIOException var3) {
                    if(var3.getErrorCode() == 6L) {
                        throw new NativeIOException("The handle is invalid.", Errno.EBADF);
                    }

                    LOG.warn(String.format("NativeIO.getFstat error (%d): %s", new Object[]{Long.valueOf(var3.getErrorCode()), var3.getMessage()}));
                    throw new NativeIOException("Unknown error", Errno.UNKNOWN);
                }
            }

            return stat;
        }

        private static String getName(NativeIO.POSIX.IdCache domain, int id) throws IOException {
            Map idNameCache = domain == NativeIO.POSIX.IdCache.USER?USER_ID_NAME_CACHE:GROUP_ID_NAME_CACHE;
            NativeIO.POSIX.CachedName cachedName = (NativeIO.POSIX.CachedName)idNameCache.get(Integer.valueOf(id));
            long now = System.currentTimeMillis();
            String name;
            if(cachedName != null && cachedName.timestamp + cacheTimeout > now) {
                name = cachedName.name;
            } else {
                name = domain == NativeIO.POSIX.IdCache.USER?getUserName(id):getGroupName(id);
                if(LOG.isDebugEnabled()) {
                    String type = domain == NativeIO.POSIX.IdCache.USER?"UserName":"GroupName";
                    LOG.debug("Got " + type + " " + name + " for ID " + id + " from the native implementation");
                }

                cachedName = new NativeIO.POSIX.CachedName(name, now);
                idNameCache.put(Integer.valueOf(id), cachedName);
            }

            return name;
        }

        static native String getUserName(int var0) throws IOException;

        static native String getGroupName(int var0) throws IOException;

        public static native long mmap(FileDescriptor var0, int var1, boolean var2, long var3) throws IOException;

        public static native void munmap(long var0, long var2) throws IOException;

        static {
            if(NativeCodeLoader.isNativeCodeLoaded()) {
                try {
                    Configuration t = new Configuration();
                    NativeIO.workaroundNonThreadSafePasswdCalls = t.getBoolean("hadoop.workaround.non.threadsafe.getpwuid", true);
                    NativeIO.initNative();
                    nativeLoaded = true;
                    cacheTimeout = t.getLong("hadoop.security.uid.cache.secs", 14400L) * 1000L;
                    LOG.debug("Initialized cache for IDs to User/Group mapping with a  cache timeout of " + cacheTimeout / 1000L + " seconds.");
                } catch (Throwable var1) {
                    PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", var1);
                }
            }

            USER_ID_NAME_CACHE = new ConcurrentHashMap();
            GROUP_ID_NAME_CACHE = new ConcurrentHashMap();
        }

        private static enum IdCache {
            USER,
            GROUP;

            private IdCache() {
            }
        }

        private static class CachedName {
            final long timestamp;
            final String name;

            public CachedName(String name, long timestamp) {
                this.name = name;
                this.timestamp = timestamp;
            }
        }

        public static class Stat {
            private int ownerId;
            private int groupId;
            private String owner;
            private String group;
            private int mode;
            public static final int S_IFMT = 61440;
            public static final int S_IFIFO = 4096;
            public static final int S_IFCHR = 8192;
            public static final int S_IFDIR = 16384;
            public static final int S_IFBLK = 24576;
            public static final int S_IFREG = 32768;
            public static final int S_IFLNK = 40960;
            public static final int S_IFSOCK = 49152;
            public static final int S_IFWHT = 57344;
            public static final int S_ISUID = 2048;
            public static final int S_ISGID = 1024;
            public static final int S_ISVTX = 512;
            public static final int S_IRUSR = 256;
            public static final int S_IWUSR = 128;
            public static final int S_IXUSR = 64;

            Stat(int ownerId, int groupId, int mode) {
                this.ownerId = ownerId;
                this.groupId = groupId;
                this.mode = mode;
            }

            Stat(String owner, String group, int mode) {
                if(!Shell.WINDOWS) {
                    this.owner = owner;
                } else {
                    this.owner = NativeIO.stripDomain(owner);
                }

                if(!Shell.WINDOWS) {
                    this.group = group;
                } else {
                    this.group = NativeIO.stripDomain(group);
                }

                this.mode = mode;
            }

            public String toString() {
                return "Stat(owner=\'" + this.owner + "\', group=\'" + this.group + "\'" + ", mode=" + this.mode + ")";
            }

            public String getOwner() {
                return this.owner;
            }

            public String getGroup() {
                return this.group;
            }

            public int getMode() {
                return this.mode;
            }
        }

        @VisibleForTesting
        public static class NoMlockCacheManipulator extends NativeIO.POSIX.CacheManipulator {
            public NoMlockCacheManipulator() {
            }

            public void mlock(String identifier, ByteBuffer buffer, long len) throws IOException {
                NativeIO.POSIX.LOG.info("mlocking " + identifier);
            }

            public long getMemlockLimit() {
                return 1125899906842624L;
            }

            public long getOperatingSystemPageSize() {
                return 4096L;
            }

            public boolean verifyCanMlock() {
                return true;
            }
        }

        @VisibleForTesting
        public static class CacheManipulator {
            public CacheManipulator() {
            }

            public void mlock(String identifier, ByteBuffer buffer, long len) throws IOException {
                NativeIO.POSIX.mlock(buffer, len);
            }

            public long getMemlockLimit() {
                return NativeIO.getMemlockLimit();
            }

            public long getOperatingSystemPageSize() {
                return NativeIO.getOperatingSystemPageSize();
            }

            public void posixFadviseIfPossible(String identifier, FileDescriptor fd, long offset, long len, int flags) throws NativeIOException {
                NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset, len, flags);
            }

            public boolean verifyCanMlock() {
                return NativeIO.isAvailable();
            }
        }
    }
}

替換源碼方法為芹扭,在工程下建一個(gè)同樣路徑的類。



src目錄不能有hadoop的配置文件,如果有冯勉,刪除它澈蚌。
代碼中要有:

Configuration conf = new Configuration();  
//active狀態(tài)namenode的地址
     conf.set("fs.defaultFS","hdfs://node1:8020");  
//active狀態(tài)ResourceManager
     conf.set("yarn.resourcemanager.hostname","master");

第二種運(yùn)行方式是本地調(diào)用服務(wù)器環(huán)境。

修改hadoop源碼灼狰,和第一種一樣宛瞄,將

conf.set("fs.defaultFS","hdfs://node1:8020");      conf.set("yarn.resourcemanager.hostname","master");

更改為

//jar文件路徑
conf.set("mapred.jar","D:\\MR\\MR.jar");

在src下添加配置文件:
core-site.xml
hdfs-site.xml
maperd-site.xml
yarn-site.xml
log4j.properties,內(nèi)容為:

# Configure logging for testing: optionally with log file

#log4j.rootLogger=debug,appender
log4j.rootLogger=info,appender  
#log4j.rootLogger=error,appender

#\u8F93\u51FA\u5230\u63A7\u5236\u53F0
log4j.appender.appender=org.apache.log4j.ConsoleAppender  
#\u6837\u5F0F\u4E3ATTCCLayout
log4j.appender.appender.layout=org.apache.log4j.TTCCLayout  

將項(xiàng)目打成jar包放在本地:如D:\MR\MR.jar
運(yùn)行時(shí)有可能報(bào)錯(cuò)

Stack trace: ExitCodeException exitCode=1: /bin/bash: line 0: fg: no job control

    at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
    at org.apache.hadoop.util.Shell.run(Shell.java:482)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

需要在mapred-site.xml添加配置:

<property>
        <name>mapreduce.app-submission.cross-platform</name>
        <value>true</value>
    </property>

第三種為直接放到服務(wù)器上運(yùn)行
這種方法不需要修改源碼
刪掉第二種的

conf.set("mapred.jar","D:\\MR\\MR.jar");

將程序打成jar包,放到hadoop集群服務(wù)器上交胚,執(zhí)行

hadoop jar jar路徑 類的全限定名

執(zhí)行
比如我的類路徑是hadoop.MapReduceDemo
那么就執(zhí)行
hadoop jar MR.jar hadoop.MapReduceDemo

執(zhí)行結(jié)果對(duì)比:
源文件:



輸出文件:


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末份汗,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蝴簇,更是在濱河造成了極大的恐慌杯活,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,607評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件熬词,死亡現(xiàn)場(chǎng)離奇詭異旁钧,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)互拾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門歪今,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人颜矿,你說我怎么就攤上這事寄猩。” “怎么了骑疆?”我有些...
    開封第一講書人閱讀 164,960評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵田篇,是天一觀的道長。 經(jīng)常有香客問我箍铭,道長泊柬,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,750評(píng)論 1 294
  • 正文 為了忘掉前任诈火,我火速辦了婚禮彬呻,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘柄瑰。我一直安慰自己闸氮,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評(píng)論 6 392
  • 文/花漫 我一把揭開白布教沾。 她就那樣靜靜地躺著蒲跨,像睡著了一般。 火紅的嫁衣襯著肌膚如雪授翻。 梳的紋絲不亂的頭發(fā)上或悲,一...
    開封第一講書人閱讀 51,604評(píng)論 1 305
  • 那天孙咪,我揣著相機(jī)與錄音,去河邊找鬼巡语。 笑死翎蹈,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的男公。 我是一名探鬼主播荤堪,決...
    沈念sama閱讀 40,347評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼枢赔!你這毒婦竟也來了澄阳?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,253評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤踏拜,失蹤者是張志新(化名)和其女友劉穎碎赢,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體速梗,經(jīng)...
    沈念sama閱讀 45,702評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡肮塞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了姻锁。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片枕赵。...
    茶點(diǎn)故事閱讀 40,015評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖屋摔,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情替梨,我是刑警寧澤钓试,帶...
    沈念sama閱讀 35,734評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站副瀑,受9級(jí)特大地震影響弓熏,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜糠睡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評(píng)論 3 330
  • 文/蒙蒙 一挽鞠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧狈孔,春花似錦信认、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至油挥,卻和暖如春潦蝇,著一層夾襖步出監(jiān)牢的瞬間款熬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評(píng)論 1 270
  • 我被黑心中介騙來泰國打工攘乒, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留贤牛,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,216評(píng)論 3 371
  • 正文 我出身青樓则酝,卻偏偏與公主長得像殉簸,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子堤魁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評(píng)論 2 355