關(guān)于海量數(shù)據(jù)取TopN問題

最近一直在思考一個問題:關(guān)于海量數(shù)據(jù)避凝,如果不借助Hadoop/MapReduce模型該如何處理呢畸冲?首先,我們可以先來了解一下MapReduce模型给梅,MapReduce是一個高度抽象易用編程的模型寓搬,它是在總結(jié)大量應(yīng)用的共同特點的基礎(chǔ)上抽象出來的分布式計算框架珍昨,在其編程模型中,通過Map和Reduce倆大組件將任務(wù)分解成互相獨立的子任務(wù)句喷,然后對每個子任務(wù)進(jìn)行處理镣典,最后將處理的結(jié)果匯總輸出。從上面的描述我們可以看出MapReduce采取的是"分而治之"的思想唾琼,那么面對海量數(shù)據(jù)處理時我們是否也可以借鑒這種思想兄春,答案是肯定的。下面我們借助具體案例來分析锡溯。

案列:海量日志訪問數(shù)據(jù),提取出訪問次數(shù)Top10的訪問地址信息列表

首先我們對問題進(jìn)行分析:

  • IP是32位的,地址最多有2^32=4G種取值情況
  • 海量數(shù)據(jù)超過單臺機器處理能力赶舆,不能一次將數(shù)據(jù)全部加載到內(nèi)存,我們可以將IP地址Hash(IP)/1024到1024個小文件中,每個小文件存放部分IP地址;
  • 對于每一個小文件,可以構(gòu)建一個IP為key,出現(xiàn)次數(shù)為value的HashMap祭饭,同時記錄當(dāng)前出現(xiàn)次數(shù)最多的那個IP地址;
  • 可以得到1024個小文件中的出現(xiàn)次數(shù)最多的IP,再依據(jù)常規(guī)的排序算法得到總體上出現(xiàn)次數(shù)最多Top10的IP即就是所求IP
  1. pom.xml依賴
1.pom.xml依賴
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.18</version>
</dependency>
<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>20.0</version>
</dependency>

  1. 具體算法實現(xiàn)
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.collect.FluentIterable;
import com.google.common.io.ByteSink;
import com.google.common.io.Files;
import com.sun.istack.internal.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static lombok.Lombok.checkNotNull;

/**
 * Created by edwin on 2019/4/29.
 * eg:海量日志數(shù)據(jù)芜茵,提取出訪問前N次的IP信息
 * <p>
 * IP是32位的,地址最多有2^32=4G種取值情況,不能完全加載到內(nèi)存中處理;
 * 采用"分而治之"的思想,按照IP地址的Hash(IP)/1024值,把海量IP日志分別切割存儲到1024個小文件中,每個小文件最多包含4MB個IP地址;
 * 對于每一個小文件,可以構(gòu)建一個IP為key,出現(xiàn)次數(shù)為value的HashMap,同時記錄當(dāng)前出現(xiàn)次數(shù)最多的那個IP地址;
 * 可以得到1024個小文件中的出現(xiàn)次數(shù)最多的IP,再依據(jù)常規(guī)的排序算法得到總體上出現(xiàn)次數(shù)最多TopN的IP;
 *
 * @author edwin
 */
@Slf4j
public class SimpleTopN {

    /***
     * 保存每個文件的ByteSink對象
     */
    private final Map<Integer, ByteSink> bufferedMap = new HashMap<Integer, ByteSink>();

    /***
     * 分隔文件-緩存每個小文件存放對象
     */
    private final Map<Integer, List<String>> dataMap = new HashMap<Integer, List<String>>();

    /***
     * 切割文件
     * 將源大文件切割成小文件倡蝙,然后將值Hash到對應(yīng)的小文件里
     * @param sourceFile   源文件
     * @param dataShardingPath  小文件分片路徑
     * @param dataSharding 分片數(shù)量
     * @throws Exception
     */
    public void splitSharding(File sourceFile, String dataShardingPath, int dataSharding) throws Exception {
        checkNotNull(sourceFile, "sourceFile must not be null.");
        checkNotNull(dataShardingPath, "dataShardingPath must not be null.");
        checkNotNull(dataSharding, "dataSharding must not be null.");
        Stopwatch stopwatch = Stopwatch.createStarted();
        //創(chuàng)建小文件
        for (int i = 0; i < dataSharding; i++) {
            File file = new File(dataShardingPath + "shard_" + i + ".txt");
            if (!file.exists()) {
                file.createNewFile();
            }
            bufferedMap.put(i, Files.asByteSink(file));
            dataMap.put(i, new LinkedList<String>());
        }

        //讀取源文件
        //readBigDataFileByGuava(sourceFile,dataSharding);

        //讀取源文件
        readBigDataFileByCommonsIO(sourceFile, dataSharding);
        long costTimes = stopwatch.elapsed(TimeUnit.MILLISECONDS);
        log.info("sharding file finish, total cost time:{} ms.", costTimes);
    }

    /***
     * Guava readLines 方式讀取文件
     * <p>需全量讀入內(nèi)存九串,如果數(shù)據(jù)文件過大會造成內(nèi)存溢出OutOfMemoryError</p>
     * @param sourceFile
     * @param dataSharding
     */
    private void readBigDataFileByGuava(File sourceFile, int dataSharding) {
        try {
            List<String> readLines = Files.readLines(sourceFile, Charsets.UTF_8);
            for (String ip : readLines) {
                //按照IP地址的Hash(IPNode)%1024值,把整個大文件映射為1024個小文件
                int fileIndex = hashCode(ip) % dataSharding;
                List<String> list = dataMap.get(fileIndex);
                list.add(ip + "\n");
                if (list.size() % 1000 == 0) {
                    //將數(shù)據(jù)寫入文件
                    ByteSink byteSink = bufferedMap.get(fileIndex);
                    byteSink.write(Joiner.on(" ").join(list.toArray()).getBytes());
                }
            }
        } catch (Exception e) {
            log.error("read(Guava readLines) file exception,msg:{}", e.getMessage(), e);
        }
    }


    /****
     * Apache Commons IO方式讀取文件
     * <p>非全量讀入內(nèi)存,資源消耗小</p>
     * @param sourceFile 源文件
     * @param dataSharding 分片數(shù)
     */
    private void readBigDataFileByCommonsIO(File sourceFile, int dataSharding) {
        LineIterator it = null;
        try {
            //使用Apache Commons 自定義LineIterator處理IO流
            it = FileUtils.lineIterator(sourceFile, "UTF-8");
            while (it.hasNext()) {
                //讀取每一行數(shù)據(jù)
                String ip = it.nextLine();
                int fileIndex = hashCode(ip) % dataSharding;
                List<String> list = dataMap.get(fileIndex);
                list.add(ip + "\n");
                if (list.size() % 1000 == 0) {
                    //將數(shù)據(jù)寫入文件
                    ByteSink byteSink = bufferedMap.get(fileIndex);
                    byteSink.write(Joiner.on(" ").join(list.toArray()).getBytes());
                }
            }
        } catch (Exception e) {
            log.error("read(Apache Commons IO) file exception,msg:{}", e.getMessage(), e);
        } finally {
            LineIterator.closeQuietly(it);
        }
    }


    /***
     * 分析數(shù)據(jù)
     * @param dataShardingPath 數(shù)據(jù)文件目錄
     * @param topNumber 訪問前TopN值
     * @return
     * @throws Exception
     */
    private List<Map.Entry<String, Integer>> analysis(String dataShardingPath, int topNumber) throws Exception {
        checkNotNull(dataShardingPath, "dataShardingPath must not be null.");
        Stopwatch stopwatch = Stopwatch.createStarted();
        File shardingFile = new File(dataShardingPath);
        //獲取Path下所有子目錄
        //Iterable<File> childrens = Files.fileTreeTraverser().children(shardingFile);
        //獲取Path目錄下所有目錄包含 preOrderTraversal(前序遍歷)  postOrderTraversal(后序遍歷)  breadthFirstTraversal(廣度優(yōu)先)
        FluentIterable<File> childrens = Files.fileTreeTraverser().breadthFirstTraversal(shardingFile).filter(new Predicate<File>() {
            @Override
            public boolean apply(@Nullable File file) {
                //過濾analysis目錄
                return !file.getName().equals("analysis");
            }
        });
        log.info("scan sharding directory:{}, file total : {}", dataShardingPath, childrens.size());
        //存放每個小文件訪問最多次數(shù)IP集合
        Map<String, Integer> collectMap = new HashMap<String, Integer>();
        for (File file : childrens) {
            //臨時存放當(dāng)前文件所有ip
            Map<String, Integer> tempMap = new HashMap<String, Integer>();
            List<String> readLines = Files.readLines(file, Charsets.UTF_8);
            for (String ip : readLines) {
                ip = ip.replaceAll("\r|\n", "").trim();
                if (tempMap.containsKey(ip)) {
                    tempMap.put(ip, tempMap.get(ip) + 1);
                } else {
                    tempMap.put(ip, 1);
                }
            }
            //Collectors.toMap 直接返回排好序的map
            tempMap = tempMap.entrySet().stream()
                    .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
                    .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue(), (x1, x2) -> x2, LinkedHashMap<String, Integer>::new));

            //獲取分片訪問次數(shù)最多的IP,并將其匯總到集合
            Map.Entry<String, Integer> entry = tempMap.entrySet().iterator().next();
            collectMap.put(entry.getKey(), entry.getValue());
        }
        //將Map轉(zhuǎn)換為List
        List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>(collectMap.entrySet());
        //倒序排列
        Collections.sort(list, (o1, o2) -> o2.getValue().compareTo(o1.getValue()));
        //取出TopN的IP信息
        List<Map.Entry<String, Integer>> limitList = list.stream().limit(topNumber).collect(Collectors.toList());
        long costTimes = stopwatch.elapsed(TimeUnit.MILLISECONDS);
        log.info("analysis file finish, total cost time:{} ms.", costTimes);
        return limitList;
    }


    /***
     * Hash算法
     * @param key
     * @return
     */
    private int hashCode(String key) {
        int hash;
        int i;
        for (hash = 0, i = 0; i < key.length(); ++i) {
            hash += key.charAt(i);
            hash += (hash << 10);
            hash ^= (hash >> 6);
        }
        hash += (hash << 3);
        hash ^= (hash >> 11);
        hash += (hash << 15);
        return Math.abs(hash);
    }

    /***
     * Hash算法2
     * @param key
     * @return
     */
    private final int hashCode2(Object key) {
        int h;
        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
    }


    public void generateIpFiles(String filePathName, int ipCount) {
        try {
            Stopwatch stopwatch = Stopwatch.createStarted();
            File file = new File(filePathName);
            file.createNewFile();
            StringBuffer ipAddress = new StringBuffer();
            for (int i = 0; i < ipCount; i++) {
                //文件追加多次I/O比較慢有性能問題,這里將每次生成的ip地址buffer起來,再一次寫入文件
                //根據(jù)自己的機器配置及需要生成的ip數(shù)量選擇是否需要buffer,如果數(shù)據(jù)量過大會產(chǎn)生java.lang.OutOfMemoryError
                //Files.append(generateRandomIp()+"\n", file, Charsets.UTF_8);
                ipAddress.append(generateRandomIp() + "\n");
            }
            Files.write(ipAddress.toString(), file, Charsets.UTF_8);
            long time = stopwatch.elapsed(TimeUnit.MILLISECONDS);
            log.info("Generate ip finish, ip count:{} , total cost time:{} ms.", ipCount, time);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /***
     * 生成一個隨機IP
     * Tips:
     * IP范圍,IP地址是一個32位的二進(jìn)制數(shù)寺鸥,通常被分割為4個"8位二進(jìn)制數(shù)"(也就是4個字節(jié)).
     * IP地址通常用"點分十進(jìn)制"表示成(a.b.c.d)的形式猪钮,其中,a,b,c,d都是0~255之間的十進(jìn)制整數(shù)胆建。
     * 例:點分十進(jìn)IP地址(100.4.5.6)烤低,實際上是32位二進(jìn)制數(shù)(11000000.10100111.00010111.00111000)
     * @return
     */
    private String generateRandomIp() {
        int[][] range = {{607649792, 608174079}, // 36.56.0.0-36.63.255.255
                {1038614528, 1039007743}, // 61.232.0.0-61.237.255.255
                {1783627776, 1784676351}, // 106.80.0.0-106.95.255.255
                {2035023872, 2035154943}, // 121.76.0.0-121.77.255.255
                {2078801920, 2079064063}, // 123.232.0.0-123.235.255.255
                {-1950089216, -1948778497}, // 139.196.0.0-139.215.255.255
                {-1425539072, -1425014785}, // 171.8.0.0-171.15.255.255
                {-1236271104, -1235419137}, // 182.80.0.0-182.92.255.255
                {-770113536, -768606209}, // 210.25.0.0-210.47.255.255
                {-569376768, -564133889}, // 222.16.0.0-222.95.255.255
        };
        Random random = new Random();
        int index = random.nextInt(10);
        String ip = convert2IpAddress(range[index][0] + new Random().nextInt(range[index][1] - range[index][0]));
        return ip;
    }

    /***
     * 將十進(jìn)制轉(zhuǎn)換成IP地址
     * @param ip
     * @return
     */
    private String convert2IpAddress(int ip) {
        int[] ipArray = new int[4];
        ipArray[0] = (int) ((ip >> 24) & 0xff);
        ipArray[1] = (int) ((ip >> 16) & 0xff);
        ipArray[2] = (int) ((ip >> 8) & 0xff);
        ipArray[3] = (int) (ip & 0xff);
        String realIp = Integer.toString(ipArray[0]) + "." + Integer.toString(ipArray[1]) + "." + Integer.toString(ipArray[2]) + "." + Integer.toString(ipArray[3]);
        return realIp;
    }

  1. 測試示例
public static void main(String[] args) throws Exception {

        SimpleTopN simpleTopN = new SimpleTopN();

        //IP文件切割分片子目錄
        final String shardingFilePath = "/Users/edwin/smart/home/data/ip/analysis/";
        //生成IP文件目錄
        final String filePathName = "/Users/edwin/smart/home/data/ip/ipAddress.txt";

        //生成IP文件
        simpleTopN.generateIpFiles(filePathName, 10000000);

        //切割文件1024個Sharding
        simpleTopN.splitSharding(new File(filePathName), shardingFilePath, 1024);

        //獲取訪問Top10的IP信息
        List<Map.Entry<String, Integer>> topList = simpleTopN.analysis(shardingFilePath, 10);
        for (Map.Entry<String, Integer> list : topList) {
            System.out.println("IP:" + list.getKey() + " ,Count:" + list.getValue());
        }
    }

面對海量數(shù)據(jù)我們一般都會有如下幾種處理思路,但其本質(zhì)都是將數(shù)據(jù)分而治之/hash映射 + hash統(tǒng)計 + 堆/快速/歸并排序笆载,如何取舍根據(jù)具體的業(yè)務(wù)場景而定.

  1. Bloom filter/Bitmap扑馁;
  2. Trie樹/數(shù)據(jù)庫/倒排索引涯呻;
  3. 外排序;
  4. 分布式處理之hadoop/mapreduce
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末檐蚜,一起剝皮案震驚了整個濱河市魄懂,隨后出現(xiàn)的幾起案子沿侈,更是在濱河造成了極大的恐慌闯第,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缀拭,死亡現(xiàn)場離奇詭異咳短,居然都是意外死亡,警方通過查閱死者的電腦和手機蛛淋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進(jìn)店門咙好,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人褐荷,你說我怎么就攤上這事勾效。” “怎么了叛甫?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵层宫,是天一觀的道長。 經(jīng)常有香客問我其监,道長萌腿,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任抖苦,我火速辦了婚禮毁菱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘锌历。我一直安慰自己贮庞,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布究西。 她就那樣靜靜地躺著贸伐,像睡著了一般。 火紅的嫁衣襯著肌膚如雪怔揩。 梳的紋絲不亂的頭發(fā)上捉邢,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天,我揣著相機與錄音商膊,去河邊找鬼伏伐。 笑死,一個胖子當(dāng)著我的面吹牛晕拆,可吹牛的內(nèi)容都是我干的藐翎。 我是一名探鬼主播材蹬,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼吝镣!你這毒婦竟也來了堤器?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤末贾,失蹤者是張志新(化名)和其女友劉穎闸溃,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體拱撵,經(jīng)...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡辉川,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了拴测。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片乓旗。...
    茶點故事閱讀 40,852評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖集索,靈堂內(nèi)的尸體忽然破棺而出屿愚,到底是詐尸還是另有隱情,我是刑警寧澤务荆,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布妆距,位于F島的核電站,受9級特大地震影響蛹含,放射性物質(zhì)發(fā)生泄漏毅厚。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一浦箱、第九天 我趴在偏房一處隱蔽的房頂上張望吸耿。 院中可真熱鬧,春花似錦酷窥、人聲如沸咽安。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽妆棒。三九已至,卻和暖如春沸伏,著一層夾襖步出監(jiān)牢的瞬間糕珊,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工毅糟, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留红选,地道東北人。 一個月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓姆另,卻偏偏與公主長得像喇肋,于是被迫代替她去往敵國和親坟乾。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,851評論 2 361