背景
每次用到的時(shí)候新創(chuàng)建一個(gè)狀態(tài)機(jī)罢艾,太奢侈了臀玄,官方文檔里面也提到過這點(diǎn)渊额。
而且創(chuàng)建出來的實(shí)例况木,其狀態(tài)也跟當(dāng)前訂單的不符;spring statemachine暫時(shí)不支持每次創(chuàng)建時(shí)指定當(dāng)前狀態(tài)旬迹,所以對狀態(tài)機(jī)引擎實(shí)例的持久化火惊,就成了必須要考慮的問題。(不過在后續(xù)版本有直接指定狀態(tài)的方式奔垦,這個(gè)后面會寫)
擴(kuò)展一下
這里擴(kuò)展說明一下屹耐,狀態(tài)機(jī)引擎的持久化一直是比較容易引起討論的,因?yàn)楹芏鄨鼍安⒉幌M俣啻鎯σ恍┲虚g非業(yè)務(wù)數(shù)據(jù)椿猎,之前在淘寶工作時(shí)惶岭,淘寶的訂單系統(tǒng)tradeplatform自己實(shí)現(xiàn)了一套workflowEngine寿弱,其實(shí)說白了也就是一套狀態(tài)機(jī)引擎,所有的配置都放在xml中按灶,每次每個(gè)環(huán)節(jié)的請求過來症革,都會重新創(chuàng)建一個(gè)狀態(tài)機(jī)引擎實(shí)例,并根據(jù)當(dāng)前的訂單狀態(tài)來設(shè)置引擎實(shí)例的狀態(tài)鸯旁。
workflowEngine沒有做持久化噪矛,私下里猜測下這樣實(shí)現(xiàn)的原因:
1、淘系數(shù)據(jù)量太大羡亩,一天幾千萬筆訂單摩疑,額外的信息存儲就要耗費(fèi)很多存儲資源危融;
2畏铆、完全自主開發(fā)的狀態(tài)機(jī)引擎,可定制化比較強(qiáng)吉殃,根據(jù)自己的業(yè)務(wù)需要可以按自己的需要處理辞居。
而反過來,spring statemachine并不支持隨意指定初始狀態(tài)蛋勺,每次創(chuàng)建都是固定的初始化狀態(tài)瓦灶,其實(shí)也只是有好處的,標(biāo)準(zhǔn)版流程抱完,而且可以保證安全贼陶,每個(gè)節(jié)點(diǎn)都是按照事先定義好的流程跑下來,而不是隨意指定巧娱。所以碉怔,狀態(tài)機(jī)引擎實(shí)例的持久化,我們這次的主題禁添,那就繼續(xù)聊下去吧撮胧。
持久化
spring statemachine 本身支持了內(nèi)存、redis及db的持久化老翘,內(nèi)存持久化就不說了芹啥,看源碼實(shí)現(xiàn)就是放在了hashmap里,平時(shí)也沒誰項(xiàng)目中可以這么奢侈铺峭,啥啥都放在內(nèi)存中墓怀,而且一旦重啟…..??。下面詳細(xì)說下利用redis進(jìn)行的持久化操作卫键。
依賴引入
spring statemachine 本身是提供了一個(gè)redis存儲的組件的傀履,在1.2.10.RELEASE版本中,這個(gè)組件需要通過依賴引入永罚,同時(shí)需要引入的還有序列化的組件kyro啤呼、data-common:
gradle引入依賴 (build.gradle 或者 libraries.gradle卧秘,由自己項(xiàng)目的gradle組織方式來定):
compile 'org.springframework.statemachine:spring-statemachine-core:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-data-common:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-kyro:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-redis:1.2.10.RELEASE'
當(dāng)然如果是maven的話,一樣的官扣,pom.xml如下:
<dependencies>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-data-common</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-kyro</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-redis</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
</dependencies>
先把持久化的調(diào)用軌跡說明下
說明:
spring statemachine持久化時(shí)翅敌,采用了三層結(jié)構(gòu)設(shè)計(jì),persister —>persist —>repository惕蹄。
- 其中persister中封裝了write和restore兩個(gè)方法蚯涮,分別用于持久化寫及反序列化讀出。
- persist只是一層皮卖陵,主要還是調(diào)用repository中的實(shí)際實(shí)現(xiàn)遭顶;但是在這里,由于redis存儲不保證百分百數(shù)據(jù)安全泪蔫,所以我實(shí)現(xiàn)了一個(gè)自定義的persist棒旗,其中封裝了數(shù)據(jù)寫入db、從db中讀取的邏輯撩荣。
- repository中做了兩件事兒
- 序列化/反序列化數(shù)據(jù)铣揉,將引擎實(shí)例與二進(jìn)制數(shù)組互相轉(zhuǎn)換
- 讀、寫redis
詳細(xì)的實(shí)現(xiàn)
Persister
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.statemachine.redis.RedisStateMachinePersister;
@Configuration
public class BizOrderRedisStateMachinePersisterConfig {
@Autowired
private StateMachinePersist bizOrderRedisStateMachineContextPersist;
@Bean(name = "bizOrderRedisStateMachinePersister",autowire = Autowire.BY_TYPE)
public StateMachinePersister<BizOrderStatusEnum, BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister() {
return new RedisStateMachinePersister<>(bizOrderRedisStateMachineContextPersist);
}
}
這里采用官方samples中初始化的方式餐曹,通過@Bean注解來創(chuàng)建一個(gè)RedisStateMachinePersister實(shí)例逛拱,注意其中傳遞進(jìn)去的Persist為自定義的bizOrderRedisStateMachineContextPersist
Persist
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.MessageHeaders;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.kryo.MessageHeadersSerializer;
import org.springframework.statemachine.kryo.StateMachineContextSerializer;
import org.springframework.statemachine.kryo.UUIDSerializer;
import org.springframework.statemachine.redis.RedisStateMachineContextRepository;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Base64;
import java.util.UUID;
@Component("bizOrderRedisStateMachineContextPersist")
public class BizOrderRedisStateMachineContextPersist implements StateMachinePersist<BizOrderStatusEnum, BizOrderStatusChangeEventEnum, String> {
@Autowired
@Qualifier("redisStateMachineContextRepository")
private RedisStateMachineContextRepository<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> redisStateMachineContextRepository;
@Autowired
private BizOrderStateMachineContextRepository bizOrderStateMachineContextRepository;
// 加入存儲到DB的數(shù)據(jù)repository, biz_order_state_machine_context表結(jié)構(gòu):
// bizOrderId
// contextStr
// curStatus
// updateTime
/**
* Write a {@link StateMachineContext} into a persistent store
* with a context object {@code T}.
*
* @param context the context
* @param contextObj the context ojb
* @throws Exception the exception
*/
@Override
@Transactional
public void write(StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> context, String contextObj) throws Exception {
redisStateMachineContextRepository.save(context, contextObj);
// save to db
BizOrderStateMachineContext queryResult = bizOrderStateMachineContextRepository.selectByOrderId(contextObj);
if (null == queryResult) {
BizOrderStateMachineContext bosmContext = new BizOrderStateMachineContext(contextObj,
context.getState().getStatus(), serialize(context));
bizOrderStateMachineContextRepository.insertSelective(bosmContext);
} else {
queryResult.setCurOrderStatus(context.getState().getStatus());
queryResult.setContext(serialize(context));
bizOrderStateMachineContextRepository.updateByPrimaryKeySelective(queryResult);
}
}
/**
* Read a {@link StateMachineContext} from a persistent store
* with a context object {@code T}.
*
* @param contextObj the context ojb
* @return the state machine context
* @throws Exception the exception
*/
@Override
public StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> read(String contextObj) throws Exception {
StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> context = redisStateMachineContextRepository.getContext(contextObj);
//redis 訪緩存擊穿
if (null != context && BizOrderConstants.STATE_MACHINE_CONTEXT_ISNULL.equalsIgnoreCase(context.getId())) {
return null;
}
//redis 為空走db
if (null == context) {
BizOrderStateMachineContext boSMContext = bizOrderStateMachineContextRepository.selectByOrderId(contextObj);
if (null != boSMContext) {
context = deserialize(boSMContext.getContext());
redisStateMachineContextRepository.save(context, contextObj);
} else {
context = new StateMachineContextIsNull();
redisStateMachineContextRepository.save(context, contextObj);
}
}
return context;
}
private String serialize(StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> context) throws UnsupportedEncodingException {
Kryo kryo = kryoThreadLocal.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Output output = new Output(out);
kryo.writeObject(output, context);
output.close();
return Base64.getEncoder().encodeToString(out.toByteArray());
}
@SuppressWarnings("unchecked")
private StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> deserialize(String data) throws UnsupportedEncodingException {
if (StringUtils.isEmpty(data)) {
return null;
}
Kryo kryo = kryoThreadLocal.get();
ByteArrayInputStream in = new ByteArrayInputStream(Base64.getDecoder().decode(data));
Input input = new Input(in);
return kryo.readObject(input, StateMachineContext.class);
}
private static final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>() {
@SuppressWarnings("rawtypes")
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.addDefaultSerializer(StateMachineContext.class, new StateMachineContextSerializer());
kryo.addDefaultSerializer(MessageHeaders.class, new MessageHeadersSerializer());
kryo.addDefaultSerializer(UUID.class, new UUIDSerializer());
return kryo;
}
};
}
說明:
如果只是持久化到redis中,那么BizOrderStateMachineContextRepository相關(guān)的所有內(nèi)容均可刪除台猴。不過由于redis無法承諾百分百的數(shù)據(jù)安全朽合,所以我這里做了兩層持久化,redis+db
-
存入redis中的數(shù)據(jù)默認(rèn)采用kryo來序列化及反序列化饱狂,RedisStateMachineContextRepository中實(shí)現(xiàn)了對應(yīng)代碼曹步。但是spring statemachine默認(rèn)的db存儲比較復(fù)雜,需要?jiǎng)?chuàng)建多張表嗡官,參加下圖:
這里需要額外創(chuàng)建5張表箭窜,分別存儲Action\Guard\State\StateMachine\Transition,比較復(fù)雜衍腥。
-
所以這里創(chuàng)建了一張表biz_order_state_machine_context磺樱,結(jié)構(gòu)很簡單:bizOrderId,contextStr,curStatus,updateTime,其中關(guān)鍵是contextStr婆咸,用于存儲與redis中相同的內(nèi)容
Repository
有兩個(gè)repository竹捉,一個(gè)是spring statemachine提供的redisRepo,另一個(gè)則是項(xiàng)目中基于mybatis的repo尚骄,先是db-repo:
import org.apache.ibatis.annotations.Param; import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Repository; import java.util.List; @Repository public interface BizOrderStateMachineContextRepository { int deleteByPrimaryKey(Long id); BizOrderStateMachineContext selectByOrderId(String bizOrderId); int updateByPrimaryKey(BizOrderStateMachineContext BizOrderStateMachineContext); int updateByPrimaryKeySelective(BizOrderStateMachineContext BizOrderStateMachineContext); int insertSelective(BizOrderStateMachineContext BizOrderStateMachineContext); int selectCount(BizOrderStateMachineContext BizOrderStateMachineContext); List<BizOrderStateMachineContext> selectPage(@Param("BizOrderStateMachineContext") BizOrderStateMachineContext BizOrderStateMachineContext, @Param("pageable") Pageable pageable); }
然后是redisRepo
import org.springframework.beans.factory.annotation.Autowire; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.statemachine.redis.RedisStateMachineContextRepository; @Configuration public class BizOrderRedisStateMachineRepositoryConfig { /** * 接入asgard后块差,redis的connectionFactory可以通過serviceName + InnerConnectionFactory來注入 */ @Autowired private RedisConnectionFactory finOrderRedisInnerConnectionFactory; @Bean(name = "redisStateMachineContextRepository", autowire = Autowire.BY_TYPE) public RedisStateMachineContextRepository<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> redisStateMachineContextRepository() { return new RedisStateMachineContextRepository<>(finOrderRedisInnerConnectionFactory); } }
使用方式
@Autowired @Qualifier("bizOrderRedisStateMachinePersister") private StateMachinePersister<BizOrderStatusEnum,BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister; ...... bizOrderRedisStateMachinePersister.persist(stateMachine, request.getBizCode()); ...... StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine = bizOrderRedisStateMachinePersister.restore(srcStateMachine,statusRequest.getBizCode()); ......
支持,關(guān)于spring statemachine的持久化就交代完了,下面就是最關(guān)鍵的憨闰,怎么利用狀態(tài)機(jī)來串聯(lián)業(yè)務(wù)状蜗,下一節(jié)將會詳細(xì)描述。