多個Reducer操作同一張表可能出現的錯誤
假設有兩個Reducer吨岭,R1和R2拉宗,都需要操作MySQL的Dimension表(封裝成getDimensionId()
方法):首先查詢該表中是否存在某一維度;如果該維度存在辣辫,則返回該維度的id值(id值為自增主鍵)旦事;如果該維度不存在,則先將該維度數據插入Dimension表络它,再返回該維度的id值族檬。
在大數據量、頻繁讀取同一張表的情況下化戳,R1单料、R2同時調用getDimensionId()方法,可能會出現以下錯誤:Dimension表中有可能存在兩個id不同点楼、但維度相同的數據扫尖。而我們需要的是:相同的維度數據只能出現一次。
解決辦法:自定Hadoop RPC服務掠廓。在服務器端定義getDimensionId()方法换怖,讓R1和R2遠程調用。
項目結構
接口定義
- 必須繼承自VersionedProtocol
- 必須擁有
versionID
屬性蟀瞧,且名稱不能改變 - 定義getDimensionId()方法
/**
* 提供專門操作dimension表的接口
* 必須繼承自VersionedProtocol
*
* @author liangxw
*/
public interface IDimensionHandler extends VersionedProtocol {
// 版本id沉颂,屬性名稱不能改變
long versionID = 1;
/**
* 根據dimension的value值獲取id
* 如果數據庫中有,那么直接返回悦污。如果沒有铸屉,那么進行插入后返回新的id值
*/
int getDimensionId(Dimension dimension) throws IOException;
}
服務器端
接口實現
- 實現了getDimensionId()方法
- 使用
LinkedHashMap
實現了服務器端緩存,提高讀取效率
/**
* 實現了IDimensionHandler
*/
public class DimensionHandlerImpl implements IDimensionHandler {
private static final Logger logger = LoggerFactory.getLogger(DimensionHandlerImpl.class);
private ThreadLocal<Connection> threadLocal = new ThreadLocal<>();
// 用于服務器端保存維度和對應的id值
// 超過5000條時切端,刪除舊條目
private Map<String, Integer> dimensionIdCache = new LinkedHashMap<String, Integer>() {
private static final long serialVersionUID = -8113529501777031499L;
private static final int MAX_ENTRIES = 5000;
@Override
protected boolean removeEldestEntry(Entry<String, Integer> eldest) {
// 緩存容量彻坛, 如果這里返回true,那么刪除最早加入的數據
return this.size() > MAX_ENTRIES;
}
};
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return IDimensionHandler.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
throws IOException {
// 返回空即可
return null;
}
@Override
public int getDimensionId(Dimension dimension) throws IOException {
// 將維度轉換為字符串(維度的另一種描述方式)
String dimensionString = buildDimensionString(dimension);
// 查看服務器端sqlCacheMap緩存中是否有該維度對應的id值,有則返回
if (this.dimensionIdCache.containsKey(dimensionString)) {
return this.dimensionIdCache.get(dimensionString);
}
// 如果服務器端dimensionStringCache緩存中沒有該維度的值
Connection conn;
try {
// 存放具體執(zhí)行sql語句的數組
String[] sql;
if (dimension instanceof DateD) {
sql = this.buildDateSql();
} else if (dimension instanceof PlatformD) {
sql = this.buildPlatformSql();
} else if (dimension instanceof BrowserD) {
sql = this.buildBrowserSql();
} else {
throw new IOException("不支持此dimensionid的獲取:" + dimension.getClass());
}
// 獲取數據庫連接
conn = this.getConnection();
int id;
synchronized (this) {
// 執(zhí)行sql語句昌屉,獲得id值
id = this.executeSql(conn, sql, dimension);
}
// 將該dimension和id值在服務器端進行緩存
this.dimensionIdCache.put(dimensionString, id);
return id;
} catch (Throwable e) {
logger.error("操作數據庫出現異常", e);
throw new IOException(e);
}
}
/**
* 將dimension轉換為字符串
* 就是簡單的字符串拼接
*/
public static String buildDimensionString(Dimension dimension) {
StringBuilder sb = new StringBuilder();
if (dimension instanceof DateD) {
DateD date = (DateD) dimension;
sb.append("date_dimension")
.append(date.getYear())
.append(date.getSeason())
.append(date.getMonth())
.append(date.getWeek())
.append(date.getDay())
.append(date.getType());
} else if (dimension instanceof PlatformD) {
PlatformD platform = (PlatformD) dimension;
sb.append("platform_dimension")
.append(platform.getPlatformName())
.append(platform.getPlatformVersion());
} else if (dimension instanceof BrowserD) {
BrowserD browser = (BrowserD) dimension;
sb.append("browser_dimension")
.append(browser.getBrowser())
.append(browser.getBrowserVersion());
}
if (sb.length() == 0) {
throw new RuntimeException("無法將指定dimension轉換為字符串:" + dimension.getClass());
}
return sb.toString();
}
/**
* 創(chuàng)建date dimension相關sql
*/
private String[] buildDateSql() {
String querySql = "SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?";
String insertSql = "INSERT INTO `dimension_date`(`year`, `season`, `month`, `week`, `day`, `type`, `calendar`) VALUES(?, ?, ?, ?, ?, ?, ?)";
return new String[]{querySql, insertSql};
}
/**
* 創(chuàng)建polatform dimension相關sql
*/
private String[] buildPlatformSql() {
String querySql = "SELECT `id` FROM `dimension_platform` WHERE `platform_name` = ? AND `platform_version` = ?";
String insertSql = "INSERT INTO `dimension_platform`(`platform_name`, `platform_version`) VALUES(?, ?)";
return new String[]{querySql, insertSql};
}
/**
* 創(chuàng)建browser dimension相關sql
*/
private String[] buildBrowserSql() {
String querySql = "SELECT `id` FROM `dimension_browser` WHERE `browser_name` = ? AND `browser_version` = ?";
String insertSql = "INSERT INTO `dimension_browser`(`browser_name`, `browser_version`) VALUES(?, ?)";
return new String[]{querySql, insertSql};
}
/**
* 連接數據庫
*/
private Connection getConnection() throws SQLException {
Connection conn;
synchronized (this) {
conn = threadLocal.get();
try {
if (conn == null || conn.isClosed() || !conn.isValid(3)) {
conn = JdbcManager.getConnection(MYSQL_DATABASE);
}
} catch (SQLException e) {
try {
if (conn != null)
conn.close();
} catch (SQLException e1) {
// nothings
}
conn = JdbcManager.getConnection(MYSQL_DATABASE);
}
this.threadLocal.set(conn);
}
return conn;
}
/**
* 具體執(zhí)行sql的方法
*/
@SuppressWarnings("resource")
private int executeSql(Connection conn, String[] sqls, Dimension dimension)
throws SQLException {
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
// 從數據庫中查詢dimension的id值
pstmt = conn.prepareStatement(sqls[0]); // 創(chuàng)建查詢sql的pstmt對象
// 設置參數
this.setArgs(pstmt, dimension);
rs = pstmt.executeQuery();
if (rs.next()) {
// 查詢到即返回
return rs.getInt(1);
}
// 如果該dimension在數據庫中不存在钙蒙,則首先插入該dimension
// 第二個參數表示是否返回自增長的主鍵的id
pstmt = conn.prepareStatement(sqls[1], Statement.RETURN_GENERATED_KEYS);
// 設置參數
this.setArgs(pstmt, dimension);
pstmt.executeUpdate();
// 獲取返回的自增長的id
rs = pstmt.getGeneratedKeys();
if (rs.next()) {
return rs.getInt(1); // 獲取返回值
}
} finally {
if (rs != null) {
try {
rs.close();
} catch (Throwable e) {
// nothing
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (Throwable e) {
// nothing
}
}
}
throw new RuntimeException("從數據庫獲取id失敗");
}
/**
* 設置參數
*/
private void setArgs(PreparedStatement pstmt, Dimension dimension) throws SQLException {
int i = 0;
if (dimension instanceof DateD) {
DateD date = (DateD) dimension;
pstmt.setInt(++i, date.getYear());
pstmt.setInt(++i, date.getSeason());
pstmt.setInt(++i, date.getMonth());
pstmt.setInt(++i, date.getWeek());
pstmt.setInt(++i, date.getDay());
pstmt.setString(++i, date.getType());
pstmt.setDate(++i, new Date(date.getCalendar().getTime()));
} else if (dimension instanceof PlatformD) {
PlatformD platform = (PlatformD) dimension;
pstmt.setString(++i, platform.getPlatformName());
pstmt.setString(++i, platform.getPlatformVersion());
} else if (dimension instanceof BrowserD) {
BrowserD browser = (BrowserD) dimension;
pstmt.setString(++i, browser.getBrowser());
pstmt.setString(++i, browser.getBrowserVersion());
}
}
}
接口啟動
- 將配置信息保存在HDFS上
客戶端從HDFS上讀取配置信息 - 添加關閉操作的鉤子
/**
* IDimensionConverter服務接口的啟動類
*
* @author liangxw
*/
public class DimensionHandlerServer {
private static final Logger logger = Logger.getLogger(DimensionHandlerServer.class);
private AtomicBoolean isRunning = new AtomicBoolean(false); // 標識是否啟動
private Server server = null; // 服務對象
private Configuration conf = null;
// 保存在hdfs上的配置文件
private static final String CONFIG_SAVE_PATH = "/user/liangxw/rpc/config";
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://bigdata.liangxw.com:9000");
conf.set("hbase.zookeeper.quorum", "bigdata.liangxw.com:2181");
DimensionHandlerServer dhs = new DimensionHandlerServer(conf);
dhs.startServer();
}
// 添加一個鉤子,進行關閉操作
private DimensionHandlerServer(Configuration conf) {
this.conf = conf;
Runtime.getRuntime().addShutdownHook(
new Thread(new Runnable() {
@Override
public void run() {
try {
DimensionHandlerServer.this.stopServer();
} catch (IOException e) {
// nothing
}
}
}));
}
/**
* 關閉服務
*/
private void stopServer() throws IOException {
logger.info("關閉服務開始");
try {
// 首先移除配置文件
this.removeRPCConf();
logger.info("刪除配置文件");
} finally {
if (this.server != null) {
Server tmp = this.server;
this.server = null;
tmp.stop();
}
}
logger.info("關閉服務結束");
}
/**
* 啟動服務
*/
private void startServer() {
logger.info("開始啟動服務");
synchronized (this) {
if (isRunning.get()) {
// 啟動完成
return;
}
try {
// 創(chuàng)建一個對象
IDimensionHandler converter = new DimensionHandlerImpl();
// 創(chuàng)建服務
this.server = new RPC.Builder(conf)
.setInstance(converter)
.setProtocol(IDimensionHandler.class)
.setVerbose(true)
.build();
// 獲取ip地址和端口號
int port = this.server.getPort();
String address = InetAddress.getLocalHost().getHostAddress();
// 保存ip地址和端口號
this.saveRPCConf(address, port);
// 啟動
this.server.start();
// 標識成功
isRunning.set(true);
logger.info("啟動服務成功间驮,監(jiān)聽ip地址:" + address + "躬厌,端口:" + port);
// this.server.stop();
} catch (Throwable e) {
isRunning.set(false);
logger.error("啟動服務發(fā)生異常", e);
// 關閉可能異常創(chuàng)建的服務
try {
this.stopServer();
} catch (Throwable ee) {
// nothing
}
throw new RuntimeException("啟動服務發(fā)生異常", e);
}
}
}
/**
* 保存監(jiān)聽信息
*/
private void saveRPCConf(String address, int port) throws IOException {
// 刪除已經存在的
this.removeRPCConf();
// 進行數據輸出操作
FileSystem fs = null;
BufferedWriter bw = null;
try {
fs = FileSystem.get(conf);
Path path = new Path(CONFIG_SAVE_PATH);
bw = new BufferedWriter(new OutputStreamWriter(fs.create(path)));
bw.write(address);
bw.newLine();
bw.write(String.valueOf(port));
} finally {
if (bw != null) {
try {
bw.close();
} catch (IOException e) {
// nothing
}
}
if (fs != null) {
try {
fs.close();
} catch (IOException e) {
// nothing
}
}
}
}
/**
* 刪除監(jiān)聽信息
*/
private void removeRPCConf() throws IOException {
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
Path path = new Path(CONFIG_SAVE_PATH);
if (fs.exists(path)) {
// 存在,則刪除
fs.delete(path, true);
}
} finally {
if (fs != null) {
try {
fs.close();
} catch (IOException e) {
// nothing
}
}
}
}
}
客戶端
創(chuàng)建內部代理類竞帽,增加客戶端緩存功能(客戶端緩存中查詢不到時烤咧,再去服務器端查詢)
/**
* 操作dimensionConverter相關服務的client端工具類
*
* @author liangxw
*/
public class DimensionHandlerClient {
/**
* 創(chuàng)建連接對象
*/
public static IDimensionHandler createDimensionConnector(Configuration conf)
throws IOException {
// 讀取配置文件
String[] serverConf = getDimensionServerConf(conf);
// 獲取ip和端口號
String serverIp = serverConf[0]; // 獲取ip地址
int serverPort = Integer.valueOf(serverConf[1]); // 獲取端口號
// 創(chuàng)建代理對象
return new InnerDimensionHandlerProxy(conf, serverIp, serverPort);
}
/**
* 從hdfs文件中讀取配置信息,ip地址和端口號
*/
private static String[] getDimensionServerConf(Configuration conf) throws IOException {
FileSystem fs;
BufferedReader br = null;
try {
fs = FileSystem.get(conf);
br = new BufferedReader(new InputStreamReader(fs.open(new Path(CONFIG_SAVE_PATH))));
String[] serverConf = new String[2];
serverConf[0] = br.readLine().trim(); // ip地址
serverConf[1] = br.readLine().trim(); // 端口號
return serverConf;
} finally {
if (br != null) {
try {
br.close();
} catch (Exception ee) {
// nothing
}
}
// 默認配置參數的情況下抢呆,這里不要調用fs.close()方法,因為可能fs這個對象在多個線程中公用
}
}
/**
* 關閉客戶端連接
*/
public static void stopDimensionHandlerProxy(IDimensionHandler proxy) {
if (proxy != null) {
InnerDimensionHandlerProxy innerProxy = (InnerDimensionHandlerProxy) proxy;
RPC.stopProxy(innerProxy.proxy);
}
}
/**
* 內部代理類
* 增加緩存在本地磁盤的功能
*/
private static class InnerDimensionHandlerProxy implements IDimensionHandler {
// 遠程連接代理對象
private IDimensionHandler proxy = null;
// 本地緩存dimension和對應的id
// 最多緩存1000條記錄
private Map<String, Integer> dimensionIdCache = new LinkedHashMap<String, Integer>() {
private static final long serialVersionUID = -731083744087467205L;
@Override
protected boolean removeEldestEntry(Map.Entry<String, Integer> eldest) {
return this.size() > 1000;
}
};
/**
* 構造函數笛谦,創(chuàng)建代理對象
*/
InnerDimensionHandlerProxy(Configuration conf, String address, int port)
throws IOException {
this.proxy = RPC.getProxy(
IDimensionHandler.class,
IDimensionHandler.versionID,
new InetSocketAddress(address, port),
conf
);
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return this.proxy.getProtocolVersion(protocol, clientVersion);
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
throws IOException {
return this.proxy.getProtocolSignature(protocol, clientVersion, clientMethodsHash);
}
@Override
public int getDimensionId(Dimension dimension) throws IOException {
// 創(chuàng)建cache的key值
String key = DimensionHandlerImpl.buildDimensionString(dimension);
// 首先從本地緩存中獲取id值
Integer value = this.dimensionIdCache.get(key);
if (value == null) {
// 本地沒有抱虐,則在服務器端進行獲取
value = this.proxy.getDimensionId(dimension);
this.dimensionIdCache.put(key, value);
}
return value;
}
}
}