概述
我們知道事務(wù)性在數(shù)據(jù)處理里面是非常重要的阻星,事務(wù)性決定了你最終數(shù)據(jù)的正確與否挠日。在 OLTP
領(lǐng)域里面事務(wù)一般通過底層存儲提供的事務(wù)機(jī)制就可以搞定了。但是在分布式數(shù)據(jù)處理領(lǐng)域里面赊堪,由于數(shù)據(jù)在被很多節(jié)點(diǎn)同時大規(guī)模分布式地寫祝旷,OLTP
里面的簡單事務(wù)處理就很難處理這種場景了履澳。在 Presto 里面也需要解決這樣的問題,我們今天就來看看 Presto 里面提供了什么樣的機(jī)制來達(dá)到數(shù)據(jù)寫入的事務(wù)性怀跛。
物理執(zhí)行計(jì)劃的基本單元: Operator
在正式介紹數(shù)據(jù)事務(wù)性寫入之前距贷,我們先來看看 Presto 里面物理執(zhí)行計(jì)劃的基本單元: Operator
。
我們知道在一般的“語言”里面吻谋,把語言從文本編譯到最終可執(zhí)行程序的過程中都會經(jīng)歷從文本到邏輯執(zhí)行計(jì)劃忠蝗,再從邏輯執(zhí)行計(jì)劃到物理執(zhí)行計(jì)劃的編譯過程,Presto 也不例外漓拾,在 Presto 里面邏輯執(zhí)行計(jì)劃是由 PlanNode
組成的阁最, 物理執(zhí)行計(jì)劃則是由 Operator
組成的戒祠。這里先簡單介紹一下 Operator
的設(shè)計(jì)。
下面是 Operator
接口主要方法的定義(為了行文的簡潔性刪除了一些不那么重要的方法):
public interface Operator extends AutoCloseable {
/**
* Gets the column types of pages produced by this operator.
*/
List<Type> getTypes();
/**
* Notifies the operator that no more pages will be added and the
* operator should finish processing and flush results. This method
* will not be called if the Task is already failed or canceled.
*/
void finish();
/**
* Adds an input page to the operator. This method will only be called if
* {@code needsInput()} returns true.
*/
void addInput(Page page);
/**
* Gets an output page from the operator. If no output data is currently
* available, return null.
*/
Page getOutput();
}
這個接口算是 Presto 里面比較良心的接口了, 接口設(shè)計(jì)優(yōu)良速种、每個方法有注釋姜盈、方法的含義也很好理解。
從上述接口定義可以看出 Operator
定義了一個數(shù)據(jù)處理單元配阵,處理的數(shù)據(jù)是 Page
, 一批數(shù)據(jù)流進(jìn) Operator
馏颂,經(jīng)過一系列處理邏輯,轉(zhuǎn)換成新的一批數(shù)據(jù)流出到下游別的 Operator
繼續(xù)處理棋傍。
數(shù)據(jù)處理的粒度是 Page, 但是不是所有的 Page 都一樣救拉,Page 里面的數(shù)據(jù)到底長什么樣是靠 List<Type> getTypes()
這個方法來描述。Operator 通過 addInput
方法接收來自上游的輸入瘫拣,通過 getOuput
把數(shù)據(jù)吐給下游亿絮。
getOutput()
這個方法名字取得很不好,給人的感覺好像是把這個 Operator 的輸出全部拿出來拂铡,它實(shí)際的作用是拿一個 Page 的輸出出來壹无,后面還會有很多輸出,因此我覺得 nextOutput() 可能更表意一點(diǎn)感帅。
Presto 里面的事務(wù)性數(shù)據(jù)寫入
在 Presto 里面我們是可以事務(wù)性地把數(shù)據(jù)插入底層數(shù)據(jù)存儲的 -- 當(dāng)然斗锭,具體的事務(wù)性還是底層存儲而不是 Presto 能夠決定的,但是 Presto 里面提供了相應(yīng)的機(jī)制以使得我們能夠配合底層存儲引擎一起來實(shí)現(xiàn)事務(wù)性的數(shù)據(jù)寫入失球。這個機(jī)制是這樣的:
這個機(jī)制本身其實(shí)不是很復(fù)雜岖是,在寫開始之前給你一個初始化的鉤子(Hook), 然后你分布式地去寫數(shù)據(jù),最后給你一個集中式提交的鉤子(Hook)实苞,有點(diǎn)兩階段提交的意味豺撑。
TableWriterOperator 這個名字起的有問題,對應(yīng)的讀數(shù)據(jù)的 Operator 是 TableScanOperator, 那么這個怎么也應(yīng)該叫 TableWriteOperator 啊黔牵。
上面說的還是太抽象了聪轿,我們來看個具體的例子來看看,怎么通過這個”框架”來實(shí)現(xiàn)事務(wù)性寫數(shù)據(jù)猾浦。這個例子就是 Presto 的 JDBC Connector:
JDBC Connector 在 BeginTableWrite 的時候執(zhí)行了下面這段代碼(BaseJdbcClient#beginWriteTable
):
String temporaryName = "tmp_presto_" +
UUID.randomUUID().toString().replace("-", "");
StringBuilder sql = new StringBuilder()
.append("CREATE TABLE ")
.append(quoted(catalog, schema, temporaryName))
.append(" (");
ImmutableList.Builder<String> columnList = ImmutableList.builder();
for (ColumnMetadata column : tableMetadata.getColumns()) {
columnList.add(new StringBuilder()
.append(quoted(columnName))
.append(" ")
.append(toSqlType(column.getType()))
.toString());
}
Joiner.on(", ").appendTo(sql, columnList.build());
sql.append(")");
execute(connection, sql.toString());
可以看出這段代碼建了一個臨時表, 在后面的 TableWriterOperator
算子里面陆错,數(shù)據(jù)其實(shí)是在分布式地往這個臨時表里面去寫。
在最后的 TableFinisherOperator
里面執(zhí)行了這么一段代碼(BaseJdbcClient#finishInsertTable
):
String temporaryTable = quoted(handle.getCatalogName(),
getRealSchemaName(handle), handle.getTemporaryTableName());
String targetTable = quoted(handle.getCatalogName(),
getRealSchemaName(handle), getRealTableName(handle));
String insertSql = format("INSERT INTO %s SELECT * FROM %s",
targetTable, temporaryTable);
String cleanupSql = "DROP TABLE " + temporaryTable;
execute(connection, insertSql);
execute(connection, cleanupSql);
從代碼可以看出來金赦,JDBC Connector 在最后把數(shù)據(jù)從 temporaryTable
插入到了 targetTable
里面去了音瓷,并且把臨時表 DROP 掉了,由于最后這個操作是事務(wù)性的夹抗,因此保證了整個數(shù)據(jù)寫入的事務(wù)性绳慎。
目前 Presto 的這個方案其實(shí)是有點(diǎn)臟的,細(xì)心的同學(xué)可能已經(jīng)看出來了,上面圖中的 BeginTableWrite
的形狀跟其它節(jié)點(diǎn)的形狀不一樣杏愤。這是因?yàn)?BeginTableWrite 在 Presto 里面不是一個 Operator
靡砌。 它甚至不是在任務(wù)的運(yùn)行時執(zhí)行的,而是在編譯期執(zhí)行的(也就是下文中說的 planning
)声邦。Presto 的作者在 BeginTableWrite
的文件注釋里面也承認(rèn)了這一點(diǎn)乏奥。
Major HACK alert!!!
This logic should be invoked on query start, not during planning. At that point, the token
returned by beginCreate/beginInsert should be handed down to tasks in a mapping separate
from the plan that links plan nodes to the corresponding token.
總結(jié)
不管怎么樣 Presto 還是能夠支持對數(shù)據(jù)的事務(wù)性插入,雖然方法有點(diǎn)Hack亥曹。從這一點(diǎn)也可以看出 Presto 對待代碼的態(tài)度是實(shí)用性優(yōu)先,先解決問題再說恨诱,方法好不好媳瞪,是不是最正確的方案,以后再說照宝。