Hadoop RPC服務

多個Reducer操作同一張表可能出現的錯誤

假設有兩個Reducer吨岭,R1和R2拉宗,都需要操作MySQL的Dimension表(封裝成getDimensionId()方法):首先查詢該表中是否存在某一維度;如果該維度存在辣辫,則返回該維度的id值(id值為自增主鍵)旦事;如果該維度不存在,則先將該維度數據插入Dimension表络它,再返回該維度的id值族檬。

在大數據量、頻繁讀取同一張表的情況下化戳,R1单料、R2同時調用getDimensionId()方法,可能會出現以下錯誤:Dimension表中有可能存在兩個id不同点楼、但維度相同的數據扫尖。而我們需要的是:相同的維度數據只能出現一次。

解決辦法:自定Hadoop RPC服務掠廓。在服務器端定義getDimensionId()方法换怖,讓R1和R2遠程調用。

項目結構

結構.png

接口定義

  1. 必須繼承自VersionedProtocol
  2. 必須擁有versionID屬性蟀瞧,且名稱不能改變
  3. 定義getDimensionId()方法

/**
 * 提供專門操作dimension表的接口
 * 必須繼承自VersionedProtocol
 *
 * @author liangxw
 */

public interface IDimensionHandler extends VersionedProtocol {
    // 版本id沉颂,屬性名稱不能改變
    long versionID = 1;

    /**
     * 根據dimension的value值獲取id
     * 如果數據庫中有,那么直接返回悦污。如果沒有铸屉,那么進行插入后返回新的id值
     */
    int getDimensionId(Dimension dimension) throws IOException;
}

服務器端

接口實現

  1. 實現了getDimensionId()方法
  2. 使用LinkedHashMap實現了服務器端緩存,提高讀取效率
/**
 * 實現了IDimensionHandler
 */
public class DimensionHandlerImpl implements IDimensionHandler {

    private static final Logger logger = LoggerFactory.getLogger(DimensionHandlerImpl.class);

    private ThreadLocal<Connection> threadLocal = new ThreadLocal<>();

    // 用于服務器端保存維度和對應的id值
    // 超過5000條時切端,刪除舊條目
    private Map<String, Integer> dimensionIdCache = new LinkedHashMap<String, Integer>() {

        private static final long serialVersionUID = -8113529501777031499L;

        private static final int MAX_ENTRIES = 5000;

        @Override
        protected boolean removeEldestEntry(Entry<String, Integer> eldest) {
            // 緩存容量彻坛, 如果這里返回true,那么刪除最早加入的數據
            return this.size() > MAX_ENTRIES;
        }
    };


    @Override
    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
        return IDimensionHandler.versionID;
    }

    @Override
    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
            throws IOException {
        // 返回空即可
        return null;
    }

    @Override
    public int getDimensionId(Dimension dimension) throws IOException {

        // 將維度轉換為字符串(維度的另一種描述方式)
        String dimensionString = buildDimensionString(dimension);

        // 查看服務器端sqlCacheMap緩存中是否有該維度對應的id值,有則返回
        if (this.dimensionIdCache.containsKey(dimensionString)) {
            return this.dimensionIdCache.get(dimensionString);
        }

        // 如果服務器端dimensionStringCache緩存中沒有該維度的值
        Connection conn;
        try {
            // 存放具體執(zhí)行sql語句的數組
            String[] sql;
            if (dimension instanceof DateD) {
                sql = this.buildDateSql();
            } else if (dimension instanceof PlatformD) {
                sql = this.buildPlatformSql();
            } else if (dimension instanceof BrowserD) {
                sql = this.buildBrowserSql();
            } else {
                throw new IOException("不支持此dimensionid的獲取:" + dimension.getClass());
            }
            // 獲取數據庫連接
            conn = this.getConnection();
            int id;
            synchronized (this) {
                // 執(zhí)行sql語句昌屉,獲得id值
                id = this.executeSql(conn, sql, dimension);
            }
            // 將該dimension和id值在服務器端進行緩存
            this.dimensionIdCache.put(dimensionString, id);

            return id;
        } catch (Throwable e) {
            logger.error("操作數據庫出現異常", e);
            throw new IOException(e);
        }
    }

    /**
     * 將dimension轉換為字符串
     * 就是簡單的字符串拼接
     */
    public static String buildDimensionString(Dimension dimension) {

        StringBuilder sb = new StringBuilder();

        if (dimension instanceof DateD) {
            DateD date = (DateD) dimension;
            sb.append("date_dimension")
                    .append(date.getYear())
                    .append(date.getSeason())
                    .append(date.getMonth())
                    .append(date.getWeek())
                    .append(date.getDay())
                    .append(date.getType());
        } else if (dimension instanceof PlatformD) {
            PlatformD platform = (PlatformD) dimension;
            sb.append("platform_dimension")
                    .append(platform.getPlatformName())
                    .append(platform.getPlatformVersion());
        } else if (dimension instanceof BrowserD) {
            BrowserD browser = (BrowserD) dimension;
            sb.append("browser_dimension")
                    .append(browser.getBrowser())
                    .append(browser.getBrowserVersion());
        }

        if (sb.length() == 0) {
            throw new RuntimeException("無法將指定dimension轉換為字符串:" + dimension.getClass());
        }
        return sb.toString();
    }

    /**
     * 創(chuàng)建date dimension相關sql
     */
    private String[] buildDateSql() {
        String querySql = "SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?";
        String insertSql = "INSERT INTO `dimension_date`(`year`, `season`, `month`, `week`, `day`, `type`, `calendar`) VALUES(?, ?, ?, ?, ?, ?, ?)";
        return new String[]{querySql, insertSql};
    }

    /**
     * 創(chuàng)建polatform dimension相關sql
     */
    private String[] buildPlatformSql() {
        String querySql = "SELECT `id` FROM `dimension_platform` WHERE `platform_name` = ? AND `platform_version` = ?";
        String insertSql = "INSERT INTO `dimension_platform`(`platform_name`, `platform_version`) VALUES(?, ?)";
        return new String[]{querySql, insertSql};
    }

    /**
     * 創(chuàng)建browser dimension相關sql
     */
    private String[] buildBrowserSql() {
        String querySql = "SELECT `id` FROM `dimension_browser` WHERE `browser_name` = ? AND `browser_version` = ?";
        String insertSql = "INSERT INTO `dimension_browser`(`browser_name`, `browser_version`) VALUES(?, ?)";
        return new String[]{querySql, insertSql};
    }

    /**
     * 連接數據庫
     */
    private Connection getConnection() throws SQLException {
        Connection conn;
        synchronized (this) {

            conn = threadLocal.get();
            try {
                if (conn == null || conn.isClosed() || !conn.isValid(3)) {
                    conn = JdbcManager.getConnection(MYSQL_DATABASE);
                }
            } catch (SQLException e) {
                try {
                    if (conn != null)
                        conn.close();
                } catch (SQLException e1) {
                    // nothings
                }
                conn = JdbcManager.getConnection(MYSQL_DATABASE);
            }
            this.threadLocal.set(conn);
        }
        return conn;
    }

    /**
     * 具體執(zhí)行sql的方法
     */
    @SuppressWarnings("resource")
    private int executeSql(Connection conn, String[] sqls, Dimension dimension)
            throws SQLException {

        PreparedStatement pstmt = null;
        ResultSet rs = null;

        try {
            // 從數據庫中查詢dimension的id值
            pstmt = conn.prepareStatement(sqls[0]); // 創(chuàng)建查詢sql的pstmt對象
            // 設置參數
            this.setArgs(pstmt, dimension);
            rs = pstmt.executeQuery();
            if (rs.next()) {
                // 查詢到即返回
                return rs.getInt(1);
            }
            // 如果該dimension在數據庫中不存在钙蒙,則首先插入該dimension
            // 第二個參數表示是否返回自增長的主鍵的id
            pstmt = conn.prepareStatement(sqls[1], Statement.RETURN_GENERATED_KEYS);
            // 設置參數
            this.setArgs(pstmt, dimension);
            pstmt.executeUpdate();
            // 獲取返回的自增長的id
            rs = pstmt.getGeneratedKeys();
            if (rs.next()) {
                return rs.getInt(1); // 獲取返回值
            }
        } finally {
            if (rs != null) {
                try {
                    rs.close();
                } catch (Throwable e) {
                    // nothing
                }
            }
            if (pstmt != null) {
                try {
                    pstmt.close();
                } catch (Throwable e) {
                    // nothing
                }
            }
        }
        throw new RuntimeException("從數據庫獲取id失敗");
    }

    /**
     * 設置參數
     */
    private void setArgs(PreparedStatement pstmt, Dimension dimension) throws SQLException {
        int i = 0;
        if (dimension instanceof DateD) {
            DateD date = (DateD) dimension;
            pstmt.setInt(++i, date.getYear());
            pstmt.setInt(++i, date.getSeason());
            pstmt.setInt(++i, date.getMonth());
            pstmt.setInt(++i, date.getWeek());
            pstmt.setInt(++i, date.getDay());
            pstmt.setString(++i, date.getType());
            pstmt.setDate(++i, new Date(date.getCalendar().getTime()));
        } else if (dimension instanceof PlatformD) {
            PlatformD platform = (PlatformD) dimension;
            pstmt.setString(++i, platform.getPlatformName());
            pstmt.setString(++i, platform.getPlatformVersion());
        } else if (dimension instanceof BrowserD) {
            BrowserD browser = (BrowserD) dimension;
            pstmt.setString(++i, browser.getBrowser());
            pstmt.setString(++i, browser.getBrowserVersion());
        }
    }
}

接口啟動

  1. 將配置信息保存在HDFS上
    客戶端從HDFS上讀取配置信息
  2. 添加關閉操作的鉤子
/**
 * IDimensionConverter服務接口的啟動類
 *
 * @author liangxw
 */
public class DimensionHandlerServer {

    private static final Logger logger = Logger.getLogger(DimensionHandlerServer.class);

    private AtomicBoolean isRunning = new AtomicBoolean(false); // 標識是否啟動

    private Server server = null; // 服務對象

    private Configuration conf = null;

    // 保存在hdfs上的配置文件
    private static final String CONFIG_SAVE_PATH = "/user/liangxw/rpc/config";

    public static void main(String[] args) {

        Configuration conf = new Configuration();

        conf.set("fs.defaultFS", "hdfs://bigdata.liangxw.com:9000");
        conf.set("hbase.zookeeper.quorum", "bigdata.liangxw.com:2181");

        DimensionHandlerServer dhs = new DimensionHandlerServer(conf);

        dhs.startServer();

    }

    // 添加一個鉤子,進行關閉操作
    private DimensionHandlerServer(Configuration conf) {

        this.conf = conf;

        Runtime.getRuntime().addShutdownHook(
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            DimensionHandlerServer.this.stopServer();
                        } catch (IOException e) {
                            // nothing
                        }
                    }
                }));

    }

    /**
     * 關閉服務
     */
    private void stopServer() throws IOException {
        logger.info("關閉服務開始");
        try {
            // 首先移除配置文件
            this.removeRPCConf();
            logger.info("刪除配置文件");
        } finally {
            if (this.server != null) {
                Server tmp = this.server;
                this.server = null;
                tmp.stop();
            }
        }
        logger.info("關閉服務結束");
    }

    /**
     * 啟動服務
     */
    private void startServer() {
        logger.info("開始啟動服務");
        synchronized (this) {
            if (isRunning.get()) {
                // 啟動完成
                return;
            }

            try {
                // 創(chuàng)建一個對象
                IDimensionHandler converter = new DimensionHandlerImpl();
                // 創(chuàng)建服務
                this.server = new RPC.Builder(conf)
                        .setInstance(converter)
                        .setProtocol(IDimensionHandler.class)
                        .setVerbose(true)
                        .build();
                // 獲取ip地址和端口號
                int port = this.server.getPort();
                String address = InetAddress.getLocalHost().getHostAddress();
                // 保存ip地址和端口號
                this.saveRPCConf(address, port);
                // 啟動
                this.server.start();
                // 標識成功
                isRunning.set(true);
                logger.info("啟動服務成功间驮,監(jiān)聽ip地址:" + address + "躬厌,端口:" + port);

                // this.server.stop();

            } catch (Throwable e) {
                isRunning.set(false);
                logger.error("啟動服務發(fā)生異常", e);
                // 關閉可能異常創(chuàng)建的服務
                try {
                    this.stopServer();
                } catch (Throwable ee) {
                    // nothing
                }
                throw new RuntimeException("啟動服務發(fā)生異常", e);
            }
        }

    }

    /**
     * 保存監(jiān)聽信息
     */
    private void saveRPCConf(String address, int port) throws IOException {
        // 刪除已經存在的
        this.removeRPCConf();

        // 進行數據輸出操作
        FileSystem fs = null;
        BufferedWriter bw = null;

        try {
            fs = FileSystem.get(conf);
            Path path = new Path(CONFIG_SAVE_PATH);
            bw = new BufferedWriter(new OutputStreamWriter(fs.create(path)));
            bw.write(address);
            bw.newLine();
            bw.write(String.valueOf(port));
        } finally {
            if (bw != null) {
                try {
                    bw.close();
                } catch (IOException e) {
                    // nothing
                }
            }

            if (fs != null) {
                try {
                    fs.close();
                } catch (IOException e) {
                    // nothing
                }
            }
        }
    }

    /**
     * 刪除監(jiān)聽信息
     */
    private void removeRPCConf() throws IOException {
        FileSystem fs = null;

        try {
            fs = FileSystem.get(conf);
            Path path = new Path(CONFIG_SAVE_PATH);
            if (fs.exists(path)) {
                // 存在,則刪除
                fs.delete(path, true);
            }
        } finally {

            if (fs != null) {
                try {
                    fs.close();
                } catch (IOException e) {
                    // nothing
                }
            }
        }
    }
}

客戶端

創(chuàng)建內部代理類竞帽,增加客戶端緩存功能(客戶端緩存中查詢不到時烤咧,再去服務器端查詢)

/**
 * 操作dimensionConverter相關服務的client端工具類
 *
 * @author liangxw
 */
public class DimensionHandlerClient {
    /**
     * 創(chuàng)建連接對象
     */
    public static IDimensionHandler createDimensionConnector(Configuration conf)
            throws IOException {

        // 讀取配置文件
        String[] serverConf = getDimensionServerConf(conf);
        // 獲取ip和端口號
        String serverIp = serverConf[0]; // 獲取ip地址
        int serverPort = Integer.valueOf(serverConf[1]); // 獲取端口號

        // 創(chuàng)建代理對象
        return new InnerDimensionHandlerProxy(conf, serverIp, serverPort);
    }

    /**
     * 從hdfs文件中讀取配置信息,ip地址和端口號
     */
    private static String[] getDimensionServerConf(Configuration conf) throws IOException {
        FileSystem fs;
        BufferedReader br = null;
        try {

            fs = FileSystem.get(conf);
            br = new BufferedReader(new InputStreamReader(fs.open(new Path(CONFIG_SAVE_PATH))));
            String[] serverConf = new String[2];
            serverConf[0] = br.readLine().trim(); // ip地址
            serverConf[1] = br.readLine().trim(); // 端口號
            return serverConf;

        } finally {
            if (br != null) {
                try {
                    br.close();
                } catch (Exception ee) {
                    // nothing
                }
            }
            // 默認配置參數的情況下抢呆,這里不要調用fs.close()方法,因為可能fs這個對象在多個線程中公用
        }
    }


    /**
     * 關閉客戶端連接
     */
    public static void stopDimensionHandlerProxy(IDimensionHandler proxy) {
        if (proxy != null) {
            InnerDimensionHandlerProxy innerProxy = (InnerDimensionHandlerProxy) proxy;
            RPC.stopProxy(innerProxy.proxy);
        }
    }

    /**
     * 內部代理類
     * 增加緩存在本地磁盤的功能
     */
    private static class InnerDimensionHandlerProxy implements IDimensionHandler {

        // 遠程連接代理對象
        private IDimensionHandler proxy = null;

        // 本地緩存dimension和對應的id
        // 最多緩存1000條記錄
        private Map<String, Integer> dimensionIdCache = new LinkedHashMap<String, Integer>() {
            private static final long serialVersionUID = -731083744087467205L;

            @Override
            protected boolean removeEldestEntry(Map.Entry<String, Integer> eldest) {
                return this.size() > 1000;
            }
        };

        /**
         * 構造函數笛谦,創(chuàng)建代理對象
         */
        InnerDimensionHandlerProxy(Configuration conf, String address, int port)
                throws IOException {

            this.proxy = RPC.getProxy(
                    IDimensionHandler.class,
                    IDimensionHandler.versionID,
                    new InetSocketAddress(address, port),
                    conf
            );
        }

        @Override
        public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
            return this.proxy.getProtocolVersion(protocol, clientVersion);
        }

        @Override
        public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
                throws IOException {
            return this.proxy.getProtocolSignature(protocol, clientVersion, clientMethodsHash);
        }

        @Override
        public int getDimensionId(Dimension dimension) throws IOException {

            // 創(chuàng)建cache的key值
            String key = DimensionHandlerImpl.buildDimensionString(dimension);

            // 首先從本地緩存中獲取id值
            Integer value = this.dimensionIdCache.get(key);
            if (value == null) {
                // 本地沒有抱虐,則在服務器端進行獲取
                value = this.proxy.getDimensionId(dimension);
                this.dimensionIdCache.put(key, value);
            }
            return value;
        }

    }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市饥脑,隨后出現的幾起案子恳邀,更是在濱河造成了極大的恐慌,老刑警劉巖灶轰,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谣沸,死亡現場離奇詭異,居然都是意外死亡笋颤,警方通過查閱死者的電腦和手機乳附,發(fā)現死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來伴澄,“玉大人赋除,你說我怎么就攤上這事》橇瑁” “怎么了举农?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經常有香客問我,道長摘能,這世上最難降的妖魔是什么走贪? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮忘闻,結果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己键畴,他們只是感情好,可當我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著起惕,像睡著了一般涡贱。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上惹想,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天问词,我揣著相機與錄音,去河邊找鬼嘀粱。 笑死激挪,一個胖子當著我的面吹牛,可吹牛的內容都是我干的锋叨。 我是一名探鬼主播垄分,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼娃磺!你這毒婦竟也來了薄湿?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤偷卧,失蹤者是張志新(化名)和其女友劉穎豺瘤,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體听诸,經...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡坐求,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了晌梨。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片桥嗤。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖派任,靈堂內的尸體忽然破棺而出砸逊,到底是詐尸還是另有隱情,我是刑警寧澤掌逛,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布师逸,位于F島的核電站,受9級特大地震影響豆混,放射性物質發(fā)生泄漏篓像。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一皿伺、第九天 我趴在偏房一處隱蔽的房頂上張望员辩。 院中可真熱鬧,春花似錦鸵鸥、人聲如沸奠滑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宋税。三九已至摊崭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間杰赛,已是汗流浹背呢簸。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留乏屯,地道東北人根时。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像辰晕,于是被迫代替她去往敵國和親蛤迎。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內容