寫在前面
apollo是攜程開源的配置中心中間件胁赢,目前在github上已經(jīng)擁有23k+的star,非常多的公司也引用了apollo作為配置中心掂铐。
首先放上apollo的架構(gòu)圖,網(wǎng)上有人認(rèn)為apollo是實(shí)習(xí)生用來練手寫的缩赛,這當(dāng)然是玩笑話,但也說明了apollo架構(gòu)的復(fù)雜與難懂撰糠。當(dāng)然了酥馍,如果你對(duì)apollo的源碼理解清楚后,一定能理解此架構(gòu)的用意阅酪。
app cluster namespace item的創(chuàng)建與release發(fā)布
用過apollo的都知道旨袒,發(fā)布一個(gè)配置汁针,都需要先經(jīng)過app->cluster->namespace->item的創(chuàng)建。
先看下app的創(chuàng)建邏輯砚尽,是在portal的AppController控制器里面施无,顯而易見的是AppController是一個(gè)spring mvc的controller,create方法對(duì)應(yīng)了app的創(chuàng)建必孤,會(huì)通過注解進(jìn)行權(quán)限的校驗(yàn)猾骡,將app數(shù)據(jù)保存在數(shù)據(jù)庫,發(fā)送app創(chuàng)建消息敷搪。
@PreAuthorize(value = "@permissionValidator.hasCreateApplicationPermission()")
@PostMapping
public App create(@Valid @RequestBody AppModel appModel) {
App app = transformToApp(appModel);
App createdApp = appService.createAppInLocal(app);
publisher.publishEvent(new AppCreationEvent(createdApp));
Set<String> admins = appModel.getAdmins();
if (!CollectionUtils.isEmpty(admins)) {
rolePermissionService
.assignRoleToUsers(RoleUtils.buildAppMasterRoleName(createdApp.getAppId()),
admins, userInfoHolder.getUser().getUserId());
}
return createdApp;
}
在CreationListener里面兴想,可以看到向spring注冊(cè)了消息監(jiān)聽器,因此當(dāng)app創(chuàng)建時(shí)發(fā)送的消息赡勘,調(diào)用onAppCreationEvent方法嫂便,進(jìn)而調(diào)用appAPI.createApp向adminservice發(fā)送http請(qǐng)求。
@EventListener
public void onAppCreationEvent(AppCreationEvent event) {
AppDTO appDTO = BeanUtils.transform(AppDTO.class, event.getApp());
List<Env> envs = portalSettings.getActiveEnvs();
for (Env env : envs) {
try {
appAPI.createApp(env, appDTO);
} catch (Throwable e) {
logger.error("Create app failed. appId = {}, env = {})", appDTO.getAppId(), env, e);
Tracer.logError(String.format("Create app failed. appId = %s, env = %s", appDTO.getAppId(), env), e);
}
}
}
來到adminservice的AppController闸与,http請(qǐng)求使用create方法處理毙替,一樣的,會(huì)創(chuàng)建app践樱,默認(rèn)cluster厂画,默認(rèn)namespace。
@PostMapping("/apps")
public AppDTO create(@Valid @RequestBody AppDTO dto) {
App entity = BeanUtils.transform(App.class, dto);
App managedEntity = appService.findOne(entity.getAppId());
if (managedEntity != null) {
throw new BadRequestException("app already exist.");
}
entity = adminService.createNewApp(entity);
return BeanUtils.transform(AppDTO.class, entity);
}
@Transactional
public App createNewApp(App app) {
String createBy = app.getDataChangeCreatedBy();
App createdApp = appService.save(app);
String appId = createdApp.getAppId();
appNamespaceService.createDefaultAppNamespace(appId, createBy);
clusterService.createDefaultCluster(appId, createBy);
namespaceService.instanceOfAppNamespaces(appId, ConfigConsts.CLUSTER_NAME_DEFAULT, createBy);
return app;
}
到了cluster的創(chuàng)建拷邢,由portal的ClusterController處理木羹,createCluster->clusterService.createCluster->clusterAPI.create會(huì)向adminservice發(fā)送http請(qǐng)求。
@PreAuthorize(value = "@consumerPermissionValidator.hasCreateClusterPermission(#request, #appId)")
@PostMapping(value = "apps/{appId}/clusters")
public OpenClusterDTO createCluster(@PathVariable String appId, @PathVariable String env,
@Valid @RequestBody OpenClusterDTO cluster, HttpServletRequest request) {
if (!Objects.equals(appId, cluster.getAppId())) {
throw new BadRequestException(String.format(
"AppId not equal. AppId in path = %s, AppId in payload = %s", appId, cluster.getAppId()));
}
String clusterName = cluster.getName();
String operator = cluster.getDataChangeCreatedBy();
RequestPrecondition.checkArguments(!StringUtils.isContainEmpty(clusterName, operator),
"name and dataChangeCreatedBy should not be null or empty");
if (!InputValidator.isValidClusterNamespace(clusterName)) {
throw new BadRequestException(
String.format("Invalid ClusterName format: %s", InputValidator.INVALID_CLUSTER_NAMESPACE_MESSAGE));
}
if (userService.findByUserId(operator) == null) {
throw new BadRequestException("User " + operator + " doesn't exist!");
}
ClusterDTO toCreate = OpenApiBeanUtils.transformToClusterDTO(cluster);
ClusterDTO createdClusterDTO = clusterService.createCluster(Env.fromString(env), toCreate);
return OpenApiBeanUtils.transformFromClusterDTO(createdClusterDTO);
}
public List<ClusterDTO> findClusters(Env env, String appId) {
return clusterAPI.findClustersByApp(appId, env);
}
public ClusterDTO createCluster(Env env, ClusterDTO cluster) {
if (!clusterAPI.isClusterUnique(cluster.getAppId(), env, cluster.getName())) {
throw new BadRequestException(String.format("cluster %s already exists.", cluster.getName()));
}
ClusterDTO clusterDTO = clusterAPI.create(env, cluster);
Tracer.logEvent(TracerEventType.CREATE_CLUSTER, cluster.getAppId(), "0", cluster.getName());
return clusterDTO;
}
再到adminservice的ClusterController解孙,http請(qǐng)求由create方法處理,同樣是將cluster寫入數(shù)據(jù)庫抛人。
@PostMapping("/apps/{appId}/clusters")
public ClusterDTO create(@PathVariable("appId") String appId,
@RequestParam(value = "autoCreatePrivateNamespace", defaultValue = "true") boolean autoCreatePrivateNamespace,
@Valid @RequestBody ClusterDTO dto) {
Cluster entity = BeanUtils.transform(Cluster.class, dto);
Cluster managedEntity = clusterService.findOne(appId, entity.getName());
if (managedEntity != null) {
throw new BadRequestException("cluster already exist.");
}
if (autoCreatePrivateNamespace) {
entity = clusterService.saveWithInstanceOfAppNamespaces(entity);
} else {
entity = clusterService.saveWithoutInstanceOfAppNamespaces(entity);
}
return BeanUtils.transform(ClusterDTO.class, entity);
}
繼續(xù)來到namespace的創(chuàng)建弛姜,由portal的NamespaceController處理,createNamespace->namespaceService.createNamespace->namespaceAPI.createNamespace會(huì)向adminservice發(fā)送http請(qǐng)求妖枚。
@PreAuthorize(value = "@permissionValidator.hasCreateNamespacePermission(#appId)")
@PostMapping("/apps/{appId}/namespaces")
public ResponseEntity<Void> createNamespace(@PathVariable String appId,
@RequestBody List<NamespaceCreationModel> models) {
checkModel(!CollectionUtils.isEmpty(models));
String namespaceName = models.get(0).getNamespace().getNamespaceName();
String operator = userInfoHolder.getUser().getUserId();
roleInitializationService.initNamespaceRoles(appId, namespaceName, operator);
roleInitializationService.initNamespaceEnvRoles(appId, namespaceName, operator);
for (NamespaceCreationModel model : models) {
NamespaceDTO namespace = model.getNamespace();
RequestPrecondition.checkArgumentsNotEmpty(model.getEnv(), namespace.getAppId(),
namespace.getClusterName(), namespace.getNamespaceName());
try {
namespaceService.createNamespace(Env.valueOf(model.getEnv()), namespace);
} catch (Exception e) {
logger.error("create namespace fail.", e);
Tracer.logError(
String.format("create namespace fail. (env=%s namespace=%s)", model.getEnv(),
namespace.getNamespaceName()), e);
}
}
namespaceService.assignNamespaceRoleToOperator(appId, namespaceName,userInfoHolder.getUser().getUserId());
return ResponseEntity.ok().build();
}
public NamespaceDTO createNamespace(Env env, NamespaceDTO namespace) {
if (StringUtils.isEmpty(namespace.getDataChangeCreatedBy())) {
namespace.setDataChangeCreatedBy(userInfoHolder.getUser().getUserId());
}
namespace.setDataChangeLastModifiedBy(userInfoHolder.getUser().getUserId());
NamespaceDTO createdNamespace = namespaceAPI.createNamespace(env, namespace);
Tracer.logEvent(TracerEventType.CREATE_NAMESPACE,
String.format("%s+%s+%s+%s", namespace.getAppId(), env, namespace.getClusterName(),
namespace.getNamespaceName()));
return createdNamespace;
}
來到adminservice的NamespaceController廷臼,http請(qǐng)求由create方法處理,將namespace落庫绝页。
@PostMapping("/apps/{appId}/clusters/{clusterName}/namespaces")
public NamespaceDTO create(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName,
@Valid @RequestBody NamespaceDTO dto) {
Namespace entity = BeanUtils.transform(Namespace.class, dto);
Namespace managedEntity = namespaceService.findOne(appId, clusterName, entity.getNamespaceName());
if (managedEntity != null) {
throw new BadRequestException("namespace already exist.");
}
entity = namespaceService.save(entity);
return BeanUtils.transform(NamespaceDTO.class, entity);
}
最后是item的創(chuàng)建荠商,由portal的ItemController處理,createItem->itemService.createItem->itemAPI.createItem會(huì)向adminservice發(fā)送http請(qǐng)求续誉。
@PreAuthorize(value = "@consumerPermissionValidator.hasModifyNamespacePermission(#request, #appId, #namespaceName, #env)")
@PostMapping(value = "/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/items")
public OpenItemDTO createItem(@PathVariable String appId, @PathVariable String env,
@PathVariable String clusterName, @PathVariable String namespaceName,
@RequestBody OpenItemDTO item, HttpServletRequest request) {
RequestPrecondition.checkArguments(
!StringUtils.isContainEmpty(item.getKey(), item.getDataChangeCreatedBy()),
"key and dataChangeCreatedBy should not be null or empty");
if (userService.findByUserId(item.getDataChangeCreatedBy()) == null) {
throw new BadRequestException("User " + item.getDataChangeCreatedBy() + " doesn't exist!");
}
if(!StringUtils.isEmpty(item.getComment()) && item.getComment().length() > 64){
throw new BadRequestException("Comment length should not exceed 64 characters");
}
ItemDTO toCreate = OpenApiBeanUtils.transformToItemDTO(item);
//protect
toCreate.setLineNum(0);
toCreate.setId(0);
toCreate.setDataChangeLastModifiedBy(toCreate.getDataChangeCreatedBy());
toCreate.setDataChangeLastModifiedTime(null);
toCreate.setDataChangeCreatedTime(null);
ItemDTO createdItem = itemService.createItem(appId, Env.fromString(env),
clusterName, namespaceName, toCreate);
return OpenApiBeanUtils.transformFromItemDTO(createdItem);
}
public ItemDTO createItem(String appId, Env env, String clusterName, String namespaceName, ItemDTO item) {
NamespaceDTO namespace = namespaceAPI.loadNamespace(appId, env, clusterName, namespaceName);
if (namespace == null) {
throw new BadRequestException(
"namespace:" + namespaceName + " not exist in env:" + env + ", cluster:" + clusterName);
}
item.setNamespaceId(namespace.getId());
ItemDTO itemDTO = itemAPI.createItem(appId, env, clusterName, namespaceName, item);
Tracer.logEvent(TracerEventType.MODIFY_NAMESPACE, String.format("%s+%s+%s+%s", appId, env, clusterName, namespaceName));
return itemDTO;
}
來到adminservice的ItemController莱没,http請(qǐng)求由create方法處理,將item落庫酷鸦。
@PreAcquireNamespaceLock
@PostMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/items")
public ItemDTO create(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName,
@PathVariable("namespaceName") String namespaceName, @RequestBody ItemDTO dto) {
Item entity = BeanUtils.transform(Item.class, dto);
ConfigChangeContentBuilder builder = new ConfigChangeContentBuilder();
Item managedEntity = itemService.findOne(appId, clusterName, namespaceName, entity.getKey());
if (managedEntity != null) {
throw new BadRequestException("item already exists");
}
entity = itemService.save(entity);
builder.createItem(entity);
dto = BeanUtils.transform(ItemDTO.class, entity);
Commit commit = new Commit();
commit.setAppId(appId);
commit.setClusterName(clusterName);
commit.setNamespaceName(namespaceName);
commit.setChangeSets(builder.build());
commit.setDataChangeCreatedBy(dto.getDataChangeLastModifiedBy());
commit.setDataChangeLastModifiedBy(dto.getDataChangeLastModifiedBy());
commitService.save(commit);
return dto;
}
好了饰躲,可以看到流程大同小異牙咏,都是各種落庫,完成了app->cluster->namespace->item的創(chuàng)建嘹裂。別覺得無聊妄壶,接下來走向關(guān)鍵的發(fā)布流程。
還是先到portal的ReleaseController寄狼,看到發(fā)布是由createRelease進(jìn)行處理的丁寄,首先一樣會(huì)通過releaseService.publish->releaseAPI.createRelease會(huì)向adminservice發(fā)送http請(qǐng)求。
@PreAuthorize(value = "@permissionValidator.hasReleaseNamespacePermission(#appId, #namespaceName, #env)")
@PostMapping(value = "/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/releases")
public ReleaseDTO createRelease(@PathVariable String appId,
@PathVariable String env, @PathVariable String clusterName,
@PathVariable String namespaceName, @RequestBody NamespaceReleaseModel model) {
model.setAppId(appId);
model.setEnv(env);
model.setClusterName(clusterName);
model.setNamespaceName(namespaceName);
if (model.isEmergencyPublish() && !portalConfig.isEmergencyPublishAllowed(Env.valueOf(env))) {
throw new BadRequestException(String.format("Env: %s is not supported emergency publish now", env));
}
ReleaseDTO createdRelease = releaseService.publish(model);
ConfigPublishEvent event = ConfigPublishEvent.instance();
event.withAppId(appId)
.withCluster(clusterName)
.withNamespace(namespaceName)
.withReleaseId(createdRelease.getId())
.setNormalPublishEvent(true)
.setEnv(Env.valueOf(env));
publisher.publishEvent(event);
return createdRelease;
}
public ReleaseDTO publish(NamespaceReleaseModel model) {
Env env = model.getEnv();
boolean isEmergencyPublish = model.isEmergencyPublish();
String appId = model.getAppId();
String clusterName = model.getClusterName();
String namespaceName = model.getNamespaceName();
String releaseBy = StringUtils.isEmpty(model.getReleasedBy()) ?
userInfoHolder.getUser().getUserId() : model.getReleasedBy();
ReleaseDTO releaseDTO = releaseAPI.createRelease(appId, env, clusterName, namespaceName,
model.getReleaseTitle(), model.getReleaseComment(),
releaseBy, isEmergencyPublish);
Tracer.logEvent(TracerEventType.RELEASE_NAMESPACE,
String.format("%s+%s+%s+%s", appId, env, clusterName, namespaceName));
return releaseDTO;
}
來到adminservice的ReleaseController泊愧,http請(qǐng)求由publish方法處理伊磺,將release落庫。然后發(fā)送此release的releaseMessage拼卵。
@Transactional
@PostMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases")
public ReleaseDTO publish(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName,
@PathVariable("namespaceName") String namespaceName,
@RequestParam("name") String releaseName,
@RequestParam(name = "comment", required = false) String releaseComment,
@RequestParam("operator") String operator,
@RequestParam(name = "isEmergencyPublish", defaultValue = "false") boolean isEmergencyPublish) {
Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);
if (namespace == null) {
throw new NotFoundException(String.format("Could not find namespace for %s %s %s", appId,
clusterName, namespaceName));
}
Release release = releaseService.publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish);
//send release message
Namespace parentNamespace = namespaceService.findParentNamespace(namespace);
String messageCluster;
if (parentNamespace != null) {
messageCluster = parentNamespace.getClusterName();
} else {
messageCluster = clusterName;
}
messageSender.sendMessage(ReleaseMessageKeyGenerator.generate(appId, messageCluster, namespaceName),
Topics.APOLLO_RELEASE_TOPIC);
return BeanUtils.transform(ReleaseDTO.class, release);
}
看一下messageSender.sendMessage的處理邏輯奢浑,release是包含了app->cluster->namespace的所有item,因此releaseMessageKey是由app腋腮、cluster雀彼、namespace拼接而成的,能夠定位一條release數(shù)據(jù)即寡。
public static String generate(String appId, String cluster, String namespace) {
return STRING_JOINER.join(appId, cluster, namespace);
}
繼續(xù)徊哑,releaseMessage同樣會(huì)落庫獲取到id,然后放入toClean隊(duì)列進(jìn)行處理聪富。
@Override
@Transactional
public void sendMessage(String message, String channel) {
logger.info("Sending message {} to channel {}", message, channel);
if (!Objects.equals(channel, Topics.APOLLO_RELEASE_TOPIC)) {
logger.warn("Channel {} not supported by DatabaseMessageSender!", channel);
return;
}
Tracer.logEvent("Apollo.AdminService.ReleaseMessage", message);
Transaction transaction = Tracer.newTransaction("Apollo.AdminService", "sendMessage");
try {
ReleaseMessage newMessage = releaseMessageRepository.save(new ReleaseMessage(message));
toClean.offer(newMessage.getId());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
logger.error("Sending message to database failed", ex);
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
toclean其實(shí)就是將相同releaseMessageKey的releaseMessage刪掉莺丑。讓數(shù)據(jù)庫只有一個(gè)releaseMessage對(duì)應(yīng)最新的release。
private void cleanMessage(Long id) {
//double check in case the release message is rolled back
ReleaseMessage releaseMessage = releaseMessageRepository.findById(id).orElse(null);
if (releaseMessage == null) {
return;
}
boolean hasMore = true;
while (hasMore && !Thread.currentThread().isInterrupted()) {
List<ReleaseMessage> messages = releaseMessageRepository.findFirst100ByMessageAndIdLessThanOrderByIdAsc(
releaseMessage.getMessage(), releaseMessage.getId());
releaseMessageRepository.deleteAll(messages);
hasMore = messages.size() == 100;
messages.forEach(toRemove -> Tracer.logEvent(
String.format("ReleaseMessage.Clean.%s", toRemove.getMessage()), String.valueOf(toRemove.getId())));
}
}
client拉取配置
client拉取配置是從ConfigService開始的墩蔓,getAppConfig可以獲取默認(rèn)namespace下的配置梢莽,當(dāng)然獲取其他namespace下的配置的話,傳namespace進(jìn)來即可奸披。
public static Config getAppConfig() {
return getConfig(ConfigConsts.NAMESPACE_APPLICATION);
}
public static Config getConfig(String namespace) {
return s_instance.getManager().getConfig(namespace);
}
加下來是ConfigManager的初始化昏名,使用了synchronized版的雙重檢查單例模式,最后調(diào)用ApolloInjector.getInstance獲取ConfigManager阵面。
private ConfigManager getManager() {
if (m_configManager == null) {
synchronized (this) {
if (m_configManager == null) {
m_configManager = ApolloInjector.getInstance(ConfigManager.class);
}
}
}
return m_configManager;
}
繼續(xù)調(diào)用ConfigManager的getConfig方法轻局,會(huì)根據(jù)namespace獲取對(duì)應(yīng)的Config,如果沒有Config样刷,會(huì)先加載ConfigFactory仑扑,最后調(diào)用ConfigFactory的create方法獲取Config。
@Override
public Config getConfig(String namespace) {
Config config = m_configs.get(namespace);
if (config == null) {
synchronized (this) {
config = m_configs.get(namespace);
if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);
config = factory.create(namespace);
m_configs.put(namespace, config);
}
}
}
return config;
}
來到DefaultConfigFactory的create方法置鼻,會(huì)生成RemoteConfigRepository->LocalFileConfigRepository->PropertiesConfigFile->DefaultConfig返回镇饮。
@Override
public Config create(String namespace) {
ConfigFileFormat format = determineFileFormat(namespace);
if (ConfigFileFormat.isPropertiesCompatible(format)) {
return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
}
return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
}
@Override
public ConfigFile createConfigFile(String namespace, ConfigFileFormat configFileFormat) {
ConfigRepository configRepository = createLocalConfigRepository(namespace);
switch (configFileFormat) {
case Properties:
return new PropertiesConfigFile(namespace, configRepository);
case XML:
return new XmlConfigFile(namespace, configRepository);
case JSON:
return new JsonConfigFile(namespace, configRepository);
case YAML:
return new YamlConfigFile(namespace, configRepository);
case YML:
return new YmlConfigFile(namespace, configRepository);
case TXT:
return new TxtConfigFile(namespace, configRepository);
}
return null;
}
LocalFileConfigRepository createLocalConfigRepository(String namespace) {
if (m_configUtil.isInLocalMode()) {
logger.warn(
"==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
namespace);
return new LocalFileConfigRepository(namespace);
}
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
}
RemoteConfigRepository createRemoteConfigRepository(String namespace) {
return new RemoteConfigRepository(namespace);
}
getProperty方法其實(shí)就是獲取配置的方法,從System.getProperty->m_configProperties->System.getenv->m_resourceProperties.getProperty的順序獲取配置箕母。
@Override
public String getProperty(String key, String defaultValue) {
// step 1: check system properties, i.e. -Dkey=value
String value = System.getProperty(key);
// step 2: check local cached properties file
if (value == null && m_configProperties.get() != null) {
value = m_configProperties.get().getProperty(key);
}
/**
* step 3: check env variable, i.e. PATH=...
* normally system environment variables are in UPPERCASE, however there might be exceptions.
* so the caller should provide the key in the right case
*/
if (value == null) {
value = System.getenv(key);
}
// step 4: check properties file from classpath
if (value == null && m_resourceProperties != null) {
value = m_resourceProperties.getProperty(key);
}
if (value == null && m_configProperties.get() == null && m_warnLogRateLimiter.tryAcquire()) {
logger.warn("Could not load config for namespace {} from Apollo, please check whether the configs are released in Apollo! Return default value now!", m_namespace);
}
return value == null ? defaultValue : value;
}
從系統(tǒng)配置盒让、本地配置獲取就不再詳細(xì)分析梅肤,我們直接看怎么從configservice遠(yuǎn)程拉取配置。
看RemoteConfigRepository是通過一個(gè)定時(shí)拉取配置的任務(wù)保證配置的更新和一個(gè)長(zhǎng)輪詢檢測(cè)配置保證配置更新的實(shí)時(shí)性邑茄。
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
}
先看長(zhǎng)輪詢檢測(cè)配置的實(shí)現(xiàn)姨蝴,scheduleLongPollingRefresh會(huì)調(diào)用 remoteConfigLongPollService.submit方法。
private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}
最終會(huì)調(diào)用到doLongPollingRefresh方法肺缕,會(huì)通過本身的app左医,cluster,namespace同木,還有本地緩存的releaseMessageIdMap浮梢,發(fā)送http請(qǐng)求到configservice,configservice會(huì)返回需要更新的release彤路,然后trySync拉取最新配置即可秕硝。
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
transaction.addData("Url", url);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpUtil.doGet(request, m_responseType);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200 && response.getBody() != null) {
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
notify(lastServiceDto, response.getBody());
}
//try to load balance
if (response.getStatusCode() == 304 && random.nextBoolean()) {
lastServiceDto = null;
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
if (notifications == null || notifications.isEmpty()) {
return;
}
for (ApolloConfigNotification notification : notifications) {
String namespaceName = notification.getNamespaceName();
//create a new list to avoid ConcurrentModificationException
List<RemoteConfigRepository> toBeNotified =
Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
//since .properties are filtered out by default, so we need to check if there is any listener for it
toBeNotified.addAll(m_longPollNamespaces
.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
try {
remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
}
}
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
m_longPollServiceDto.set(longPollNotifiedServiceDto);
m_remoteMessages.set(remoteMessages);
m_executorService.submit(new Runnable() {
@Override
public void run() {
m_configNeedForceRefresh.set(true);
trySync();
}
});
}
config處理長(zhǎng)輪詢檢測(cè)配置變化是在NotificationControllerV2.pollNotification處理,通過統(tǒng)一化watchKeys洲尊,獲取到最新的ReleaseMessageList远豺,通過對(duì)比client端的ReleaseMessageIdList,若當(dāng)前已經(jīng)有比client端更新的ReleaseMessage坞嘀,則調(diào)用DeferredResult的setResult方法躯护,直接返回結(jié)果。否則丽涩,先返回DeferredResult棺滞,等待直至超時(shí),或者等待到有新的ReleaseMessage到來才返回結(jié)果矢渊。
@GetMapping
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
@RequestParam(value = "appId") String appId,
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "notifications") String notificationsAsString,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "ip", required = false) String clientIp) {
List<ApolloConfigNotification> notifications = null;
try {
notifications =
gson.fromJson(notificationsAsString, notificationsTypeReference);
} catch (Throwable ex) {
Tracer.logError(ex);
}
if (CollectionUtils.isEmpty(notifications)) {
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
}
Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);
if (CollectionUtils.isEmpty(filteredNotifications)) {
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
}
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());
for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
String normalizedNamespace = notificationEntry.getKey();
ApolloConfigNotification notification = notificationEntry.getValue();
namespaces.add(normalizedNamespace);
clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
}
}
Multimap<String, String> watchedKeysMap =
watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
/**
* 1继准、set deferredResult before the check, for avoid more waiting
* If the check before setting deferredResult,it may receive a notification the next time
* when method handleMessage is executed between check and set deferredResult.
*/
deferredResultWrapper
.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResultWrapper.onCompletion(() -> {
//unregister all keys
for (String key : watchedKeys) {
deferredResults.remove(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
});
//register all keys
for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespaces, dataCenter);
/**
* 2、check new release
*/
List<ReleaseMessage> latestReleaseMessages =
releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil.closeEntityManager();
List<ApolloConfigNotification> newNotifications =
getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
latestReleaseMessages);
if (!CollectionUtils.isEmpty(newNotifications)) {
deferredResultWrapper.setResult(newNotifications);
}
return deferredResultWrapper.getResult();
}
ReleaseMessageScanner會(huì)定時(shí)掃描ReleaseMessage的變化矮男,調(diào)用ReleaseMessageListener的handleMessage處理變化移必,此方法會(huì)取出對(duì)應(yīng)的DeferredResult,將變化設(shè)置到返回結(jié)果中返回昂灵。
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
return messageScanned == 500;
}
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
try {
listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
}
}
}
}
@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
String content = message.getMessage();
Tracer.logEvent("Apollo.LongPoll.Messages", content);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
if (Strings.isNullOrEmpty(changedNamespace)) {
logger.error("message format invalid - {}", content);
return;
}
if (!deferredResults.containsKey(content)) {
return;
}
//create a new list to avoid ConcurrentModificationException
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
configNotification.addMessage(content, message.getId());
//do async notification if too many clients
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
bizConfig.releaseMessageNotificationBatch());
for (int i = 0; i < results.size(); i++) {
if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
} catch (InterruptedException e) {
//ignore
}
}
logger.debug("Async notify {}", results.get(i));
results.get(i).setResult(configNotification);
}
});
return;
}
logger.debug("Notify {} clients for key {}", results.size(), content);
for (DeferredResultWrapper result : results) {
result.setResult(configNotification);
}
logger.debug("Notification completed");
}
通知配置變更或者定時(shí)拉取都是通過RemoteConfigRepository的trySync->sync進(jìn)行更新client配置的。
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
ApolloConfig previous = m_configCache.get();
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
進(jìn)入loadApolloConfig方法舞萄,也是通過app眨补,clutster,namespace倒脓,變更的ReleaseMessageIdList向configservice發(fā)送http請(qǐng)求獲取最新配置撑螺。
private ApolloConfig loadApolloConfig() {
if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
String secret = m_configUtil.getAccessKeySecret();
Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
long onErrorSleepTime = 0; // 0 means no sleep
Throwable exception = null;
List<ServiceDTO> configServices = getConfigServices();
String url = null;
retryLoopLabel:
for (int i = 0; i < maxRetries; i++) {
List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
Collections.shuffle(randomConfigServices);
//Access the server which notifies the client first
if (m_longPollServiceDto.get() != null) {
randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
}
for (ServiceDTO configService : randomConfigServices) {
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
} catch (InterruptedException e) {
//ignore
}
}
url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());
logger.debug("Loading config from {}", url);
HttpRequest request = new HttpRequest(url);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
transaction.addData("Url", url);
try {
HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
m_configNeedForceRefresh.set(false);
m_loadConfigFailSchedulePolicy.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
if (response.getStatusCode() == 304) {
logger.debug("Config server responds with 304 HTTP status code.");
return m_configCache.get();
}
ApolloConfig result = response.getBody();
logger.debug("Loaded config for {}: {}", m_namespace, result);
return result;
} catch (ApolloConfigStatusCodeException ex) {
ApolloConfigStatusCodeException statusCodeException = ex;
//config not found
if (ex.getStatusCode() == 404) {
String message = String.format(
"Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
"please check whether the configs are released in Apollo!",
appId, cluster, m_namespace);
statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
message);
}
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
transaction.setStatus(statusCodeException);
exception = statusCodeException;
if(ex.getStatusCode() == 404) {
break retryLoopLabel;
}
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
exception = ex;
} finally {
transaction.complete();
}
// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
}
}
String message = String.format(
"Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",
appId, cluster, m_namespace, url);
throw new ApolloConfigException(message, exception);
}
來到configservice,獲取配置的請(qǐng)求在ConfigController的queryConfig方法處理崎弃。根據(jù)client的參數(shù)獲取最新的Release甘晤,組成ApolloConfig返回含潘。
@GetMapping(value = "/{appId}/{clusterName}/{namespace:.+}")
public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
@PathVariable String namespace,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,
@RequestParam(value = "ip", required = false) String clientIp,
@RequestParam(value = "messages", required = false) String messagesAsString,
HttpServletRequest request, HttpServletResponse response) throws IOException {
String originalNamespace = namespace;
//strip out .properties suffix
namespace = namespaceUtil.filterNamespaceName(namespace);
//fix the character case issue, such as FX.apollo <-> fx.apollo
namespace = namespaceUtil.normalizeNamespace(appId, namespace);
if (Strings.isNullOrEmpty(clientIp)) {
clientIp = tryToGetClientIp(request);
}
ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);
List<Release> releases = Lists.newLinkedList();
String appClusterNameLoaded = clusterName;
if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace,
dataCenter, clientMessages);
if (currentAppRelease != null) {
releases.add(currentAppRelease);
//we have cluster search process, so the cluster name might be overridden
appClusterNameLoaded = currentAppRelease.getClusterName();
}
}
//if namespace does not belong to this appId, should check if there is a public configuration
if (!namespaceBelongsToAppId(appId, namespace)) {
Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace,
dataCenter, clientMessages);
if (!Objects.isNull(publicRelease)) {
releases.add(publicRelease);
}
}
if (releases.isEmpty()) {
response.sendError(HttpServletResponse.SC_NOT_FOUND,
String.format(
"Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
appId, clusterName, originalNamespace));
Tracer.logEvent("Apollo.Config.NotFound",
assembleKey(appId, clusterName, originalNamespace, dataCenter));
return null;
}
auditReleases(appId, clusterName, dataCenter, clientIp, releases);
String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
.collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));
if (mergedReleaseKey.equals(clientSideReleaseKey)) {
// Client side configuration is the same with server side, return 304
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
Tracer.logEvent("Apollo.Config.NotModified",
assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
return null;
}
ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
mergedReleaseKey);
apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));
Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
originalNamespace, dataCenter));
return apolloConfig;
}
最后總結(jié)
1.攜程Apollo通過Release承載一個(gè)namespace的所有配置,給配置降維线婚,帶來了更好的維護(hù)性遏弱。
2.client端通過定時(shí)拉取和長(zhǎng)輪詢檢測(cè)配置變化,做到近實(shí)時(shí)獲取到最新配置塞弊,同時(shí)擁有了兜底獲取配置的手段漱逸。
3.長(zhǎng)輪詢檢測(cè)配置變化configservice使用DeferredResult做異步處理,內(nèi)部的異步處理在外部看來是同步返回游沿。
4.無論是檢測(cè)配置變化的ReleaseMessage還是拉取最新配置的Release饰抒,都采用了一些巧妙的維護(hù)與使用緩存手段,避免流量打到數(shù)據(jù)庫诀黍。