如何復(fù)制Pinpoint中一條調(diào)用鏈的完整數(shù)據(jù)
如果你熟悉Pinpoint的話渗稍,你應(yīng)該知道一條調(diào)用鏈包含哪些數(shù)據(jù)
在這里我指的是com.navercorp.pinpoint.web.controller.BusinessTransactionController#transactionInfo方法中查詢會(huì)涉及到的HBase數(shù)據(jù)阴孟。
為什么會(huì)產(chǎn)生這個(gè)需求呢,HBase中的數(shù)據(jù)都是配置了TTL的循集,過一段時(shí)間會(huì)被清理,你可能就和要這條骨骼驚奇的調(diào)用鏈說拜拜了。
如果能夠離線或者保存另外的HBase里,可以更快的復(fù)現(xiàn)場景進(jìn)行調(diào)試和排查局雄。
先看一下到底會(huì)查哪些表吧。
- TraceV2
- ApiMetaData
- StringMetaData
- SqlMetaData_Ver2
Pinpoint在構(gòu)造調(diào)用鏈界面需要的信息的時(shí)候存炮,TraceV2是一行數(shù)據(jù)炬搭,用事務(wù)號(hào)就能查詢出來。
然后遍歷這行數(shù)據(jù)里的Span和SpanEvent穆桂,使用包含的apiId,stringId,sqlId去后三個(gè)表對應(yīng)查詢所關(guān)聯(lián)的數(shù)據(jù)宫盔。
后面三個(gè),一個(gè)復(fù)雜的調(diào)用鏈會(huì)查詢很多次充尉。怎么才能知道呢飘言?
下面只是記錄一下本地試驗(yàn)的方法,僅在測試環(huán)境中使用驼侠。
利用Spring AOP 將hbase查詢結(jié)果 插入到另外的hbase中
我想了下,如果在hbase查詢的時(shí)候進(jìn)行AOP攔截谆吴,并且把數(shù)據(jù)發(fā)送到另外一個(gè)hbase的話倒源,這樣不就能把一條調(diào)用鏈的數(shù)據(jù)給剝離出來了么?
我把將查詢出來的數(shù)據(jù)插入到別的hbase的過程叫做逆轉(zhuǎn)句狼。
我們要尋找一些合適的spring bean笋熬,因?yàn)閟pring的aop只能作用在spring創(chuàng)建的對象上。
首先我注意到 com.navercorp.pinpoint.common.hbase.RowMapper#mapRow
public interface RowMapper<T> {
T mapRow(Result result, int rowNum) throws Exception;
}
第一個(gè)參數(shù)org.apache.hadoop.hbase.client.Result包含了查詢出來的一行數(shù)據(jù)腻菇,現(xiàn)在就差個(gè)表名了胳螟。
仔細(xì)看pinpoint的代碼昔馋,每次執(zhí)行hbase操作前都會(huì)調(diào)用com.navercorp.pinpoint.common.hbase.TableDescriptor#getTableName
獲取表名。
這樣設(shè)置一個(gè)線程上下文(https://github.com/apache/shiro/blob/master/core/src/main/java/org/apache/shiro/util/ThreadContext.java)糖耸,就能將表名和多行數(shù)據(jù)完整聯(lián)系在一起了秘遏。
最后線程上下文里面還需要設(shè)置一個(gè)是否逆轉(zhuǎn)查詢數(shù)據(jù)的標(biāo)志。
對于查詢單條調(diào)用鏈來說就是com.navercorp.pinpoint.web.service.SpanService#selectSpan方法進(jìn)入的時(shí)候開啟標(biāo)志嘉竟。
實(shí)現(xiàn)
首先仿照shiro弄個(gè)線程上下文邦危。
package com.navercorp.pinpoint.web.dao;
import java.util.HashMap;
import java.util.Map;
/**
* @author tankilo
* https://github.com/apache/shiro/blob/master/core/src/main/java/org/apache/shiro/util/ThreadContext.java
*/
public final class ThreadContext {
private ThreadContext() {
}
public static final String REVERSE = "REVERSE";
public static final String TABLE_NAME = "TABLE_NAME";
private static ThreadLocal<Map<String, Object>> resources = new InheritableThreadLocal<Map<String, Object>>() {
@Override
protected Map<String, Object> initialValue() {
return new HashMap<>(8);
}
};
public static void put(String key, Object value) {
if (key == null) {
throw new IllegalArgumentException("key cannot be null");
}
if (value == null) {
remove(key);
return;
}
ensureResourcesInitialized();
resources.get().put(key, value);
}
private static void ensureResourcesInitialized() {
if (resources.get() == null) {
resources.set(new HashMap<>(8));
}
}
public static void remove(String key) {
Map<String, Object> map = resources.get();
if (map != null) {
map.remove(key);
}
}
public static Object get(String key) {
Map<String, Object> map = resources.get();
if (map != null) {
return map.get(key);
} else {
return null;
}
}
private static Object getValue(Object key) {
Map<String, Object> perThreadResources = resources.get();
return perThreadResources != null ? perThreadResources.get(key) : null;
}
private static Boolean getBoolean(String key, Boolean defaultValue) {
Object value = getValue(key);
if (null != value) {
return Boolean.valueOf(value.toString());
}
return defaultValue;
}
private static String getString(String key) {
Object value = getValue(key);
if (null != value) {
return value.toString();
} else {
return null;
}
}
public static void setReverse(boolean reverse) {
put(REVERSE, reverse);
}
public static boolean isReverse() {
return getBoolean(REVERSE, false);
}
public static void setTableName(String tableName) {
put(TABLE_NAME, tableName);
}
public static String getTableName() {
return getString(TABLE_NAME);
}
public static void remove() {
resources.remove();
}
}
切面
@Aspect
public class HbaseTemplateReverseAspect {
@Autowired
@Qualifier("hbaseTemplateReverse")
private HbaseOperations2 template2;
@Autowired
private HbaseTableNameProvider tableNameProvider;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Pointcut("execution(public * com.navercorp.pinpoint.common.hbase.RowMapper.mapRow(..))")
public void pointCut() {
}
@After("pointCut() && args(result,rowNum)")
public void doBefore(Result result, int rowNum) {
if (ThreadContext.isReverse()) {
String tableNameStr = ThreadContext.getTableName();
TableName tableName = tableNameProvider.getTableName(tableNameStr);
Put put = new Put(result.getRow());
Cell[] rawCells = result.rawCells();
for (Cell cell : rawCells) {
put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp(), CellUtil.cloneValue(cell));
}
template2.asyncPut(tableName, put);
}
}
@Before("execution(public * com.navercorp.pinpoint.web.dao.hbase.HbaseTraceDaoV2.selectSpan(..))")
public void selectSpan() {
ThreadContext.setTableName(HbaseTable.TRACE_V2.getName());
}
@AfterReturning(returning = "tableName", pointcut = "execution(public * com.navercorp.pinpoint.common.hbase.TableDescriptor.getTableName())")
public void getTableName(TableName tableName) {
ThreadContext.setTableName(tableName.getNameAsString());
}
@Around("execution(public * com.navercorp.pinpoint.web.service.SpanService.selectSpan(..))")
public Object dealContext(ProceedingJoinPoint jp) throws Throwable {
Object result;
try {
ThreadContext.setReverse(true);
result = jp.proceed();
} finally {
ThreadContext.remove();
}
return result;
}
}
遺憾
里面有些曲折,看上面代碼也知道舍扰。
com.navercorp.pinpoint.web.dao.hbase.HbaseTraceDaoV2#spanMapperV2 不是spring bean倦蚪,而是手動(dòng)構(gòu)造的。
這里我就沒有繼續(xù)弄下去了边苹,因?yàn)楦杏Xspring aop的局限性很大,計(jì)劃用java instrumentation api陵且,和pinpoint的agent一樣重新弄下。
代碼提交備份在我的github上 https://github.com/tankilo/pinpoint/tree/spring-aop-hbase-reverse