本文基于 apollo-client 2.1.0 版本源码进行分析
Apollo 是携程开源的配置中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性。
Apollo支持4个维度管理Key-Value格式的配置:
application (应用)
environment (环境)
cluster (集群)
namespace (命名空间)
同时,Apollo基于开源模式开发,开源地址:https://github.com/ctripcorp/apollo
一. SpringBoot集成Apollo 1.1 引入Apollo客户端依赖 1 2 3 4 5 <dependency > <groupId > com.ctrip.framework.apollo</groupId > <artifactId > apollo-client</artifactId > <version > 2.1.0</version > </dependency >
1.2 配置apollo 1 2 3 4 5 6 7 8 9 10 11 app: id: apollo-test apollo: meta: http://10.10.10.12:8080 autoUpdateInjectedSpringProperties: true bootstrap: enabled: true namespaces: application.yaml eagerLoad: enabled: true
app.id
:AppId是应用的身份信息,是配置中心获取配置的一个重要信息。
apollo.bootstrap.enabled
:在应用启动阶段,向Spring容器注入被托管的 application.properties
文件的配置信息。
apollo.bootstrap.eagerLoad.enabled
:将 Apollo 配置加载提到初始化日志系统之前。将Apollo配置加载提到初始化日志系统之前从1.2.0版本开始,如果希望把日志相关的配置(如 1ogging.level.root=info
或 1ogback-spring.xml
中的参数)也放在Apollo管理,来使Apollo的加载顺序放到日志系统加载之前,不过这会导致Apollo的启动过程无法通过日志的方式输出(因为执行Apollo加载的时的日志输出便没有任何内容)。
1.3 启动项目 启动项目后,我们更改 apollo 中的配置,SpringBoot中的配置会自动更新:
1 [Apollo-Config-1] c.f.a.s.p.AutoUpdateConfigChangeListener : Auto update apollo changed value successfully, new value: hahhahaha12311, key: test.hello, beanName: mongoController, field: cn.bigcoder.mongo.mongodemo.web.MongoController.hello
二. SpringBoot如何在启动时加载Apollo配置
2.1 ApolloApplicationContextInitializer spring.factories
文件 是 SpringBoot 中实现 SPI 机制的重要组成,在这个文件中可以定义SpringBoot各种扩展点的实现类。Apollo 客户端 与 SpringBoot 的集成就借助了这个机制,apollo-client
包中的 META-INF/spring.factories
文件配置如下:
1 2 3 4 5 6 org.springframework.boot.autoconfigure.EnableAutoConfiguration =\ com.ctrip.framework.apollo.spring.boot.ApolloAutoConfiguration org.springframework.context.ApplicationContextInitializer =\ com.ctrip.framework.apollo.spring.boot.ApolloApplicationContextInitializer org.springframework.boot.env.EnvironmentPostProcessor =\ com.ctrip.framework.apollo.spring.boot.ApolloApplicationContextInitializer
ApolloApplicationContextInitializer
实现了 ApplicationContextInitializer
和 EnvironmentPostProcessor
两个扩展点,使得 apollo-client
能在Spring容器启动前从Apollo Server中加载配置。
EnvironmentPostProcessor
:当我们想在Bean中使用配置属性时,那么我们的配置属性必须在Bean实例化之前就放入到Spring到Environment中。即我们的接口需要在 application context refreshed 之前进行调用,而 EnvironmentPostProcessor
正好可以实现这个功能。
ApplicationContextInitializer
:是Spring框架原有的概念,这个类的主要目的就是在 ConfigurableApplicationContext
类型(或者子类型)的 ApplicationContext
做refresh之前,允许我们对 ConfigurableApplicationContext
的实例做进一步的设置或者处理。
两者虽都实现在 Application Context 做 refresh 之前加载配置,但是 EnvironmentPostProcessor
的扩展点相比 ApplicationContextInitializer
更加靠前,使得 Apollo 配置加载能够提到初始化日志系统之前。
ApolloApplicationContextInitializer.postProcessEnvironment
扩展点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public void postProcessEnvironment (ConfigurableEnvironment configurableEnvironment, SpringApplication springApplication) { initializeSystemProperty(configurableEnvironment); Boolean eagerLoadEnabled = configurableEnvironment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_EAGER_LOAD_ENABLED, Boolean.class, false ); if (!eagerLoadEnabled) { return ; } Boolean bootstrapEnabled = configurableEnvironment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED, Boolean.class, false ); if (bootstrapEnabled) { DeferredLogger.enable(); initialize(configurableEnvironment); } }
ApolloApplicationContextInitializer.initialize
扩展点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void initialize (ConfigurableApplicationContext context) { ConfigurableEnvironment environment = context.getEnvironment(); if (!environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED, Boolean.class, false )) { logger.debug("Apollo bootstrap config is not enabled for context {}, see property: ${{}}" , context, PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED); return ; } logger.debug("Apollo bootstrap config is enabled for context {}" , context); initialize(environment); }
两个扩展点最终都会调用 ApolloApplicationContextInitializer#initialize(ConfigurableEnvironment environment)
方法初始化 apollo client,并加载远端配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 protected void initialize (ConfigurableEnvironment environment) { final ConfigUtil configUtil = ApolloInjector.getInstance(ConfigUtil.class); if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) { DeferredLogger.replayTo(); if (configUtil.isOverrideSystemProperties()) { PropertySourcesUtil.ensureBootstrapPropertyPrecedence(environment); } return ; } String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION); logger.debug("Apollo bootstrap namespaces: {}" , namespaces); List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces); CompositePropertySource composite; if (configUtil.isPropertyNamesCacheEnabled()) { composite = new CachedCompositePropertySource (PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME); } else { composite = new CompositePropertySource (PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME); } for (String namespace : namespaceList) { Config config = ConfigService.getConfig(namespace); composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config)); } if (!configUtil.isOverrideSystemProperties()) { if (environment.getPropertySources().contains(StandardEnvironment.SYSTEM_ENVIRONMENT_PROPERTY_SOURCE_NAME)) { environment.getPropertySources().addAfter(StandardEnvironment.SYSTEM_ENVIRONMENT_PROPERTY_SOURCE_NAME, composite); return ; } } environment.getPropertySources().addFirst(composite); }
流程如下:
因为有两个不同的触发点,所以该方法首先检查 Spring 的 Environment 环境中是否已经有了 key 为 ApolloBootstrapPropertySources
的目标属性,有的话就不必往下处理,直接 return。
从 Environment 环境中获取 apollo.bootstrap.namespaces
属性配置的启动命名空间字符串,如果没有的话就取默认的 application 命名空间。
按逗号分割处理配置的启动命名空间字符串,然后调用 ConfigService#getConfig()
依次拉取各个命名空间的远端配置,下节详细分析这部分
创建 CompositePropertySource
复合属性源,因为 apollo-client 启动时可以加载多个命名空间的配置,每个命名空间对应一个 PropertySource
,而多个 PropertySource
就会被封装在 CompositePropertySource
对象中,若需要获取apollo中配置的属性时,就会遍历多个命名空间所对应的 PropertySource
,找到对应属性后就会直接返回,这也意味着,先加载的 namespace
中的配置具有更高优先级:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class CompositePropertySource extends EnumerablePropertySource <Object> { private final Set<PropertySource<?>> propertySources = new LinkedHashSet <>(); @Override @Nullable public Object getProperty (String name) { for (PropertySource<?> propertySource : this .propertySources) { Object candidate = propertySource.getProperty(name); if (candidate != null ) { return candidate; } } return null ; } }
调用 ConfigPropertySourceFactory#getConfigPropertySource()
缓存从远端拉取的配置,并将其包装为 PropertySource
,最终将所有拉取到的远端配置聚合到一个以 ApolloBootstrapPropertySources
为 key 的属性源包装类 CompositePropertySource
的内部。
1 2 3 4 5 6 7 8 9 public ConfigPropertySource getConfigPropertySource (String name, Config source) { ConfigPropertySource configPropertySource = new ConfigPropertySource (name, source); configPropertySources.add(configPropertySource); return configPropertySource; }
将 CompositePropertySource
属性源包装类添加到 Spring 的 Environment 环境中,注意是插入在属性源列表的头部,因为取属性的时候其实是遍历这个属性源列表来查找,找到即返回,所以出现同名属性时,前面的优先级更高。这样在当本地配置文件和Apollo中配置了同名参数时会使得Apollo中的优先级更高。
2.2 从远端加载配置 在 ApolloApplicationContextInitializer.initialize
中会调用 ConfigService.getConfig()
加载远端命名空间配置。getConfig方法处理流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static Config getConfig (String namespace) { return s_instance.getManager().getConfig(namespace); } private ConfigManager getManager () { if (m_configManager == null ) { synchronized (this ) { if (m_configManager == null ) { m_configManager = ApolloInjector.getInstance(ConfigManager.class); } } } return m_configManager; }
s_instance.getManager()
实际通过 ApolloInjector
去获取 ConfigManager
实例,ApolloInjector
其实采用了 Java 中的 ServiceLoader
机制,此处不作讨论,读者有兴趣可自行搜索
ConfigManager
其实只有一个实现类,此处最终将调用到 DefaultConfigManager#getConfig()
方法。
DefaultConfigManager#getConfig()
方法处理逻辑较为清晰,重点如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public Config getConfig (String namespace) { Config config = m_configs.get(namespace); if (config == null ) { synchronized (this ) { config = m_configs.get(namespace); if (config == null ) { ConfigFactory factory = m_factoryManager.getFactory(namespace); config = factory.create(namespace); m_configs.put(namespace, config); } } }
首先从缓存中获取配置,缓存中没有则从远程拉取,注意此处在 synchronized 代码块内部也判了一次空,采用了双重检查锁机制。
远程拉取配置首先需要通过 ConfigFactoryManager#getFactory()
方法获取 ConfigFactory
实例,这里实际上获取的是DefaultConfigFactory
,再通过 DefaultConfigFactory#create()
去获取 Apollo Server 中的配置。
在 DefaultConfigFactory#create()
中会根据加载namespace类型,创建对应的 ConfigRepository
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public Config create (String namespace) { ConfigFileFormat format = determineFileFormat(namespace); ConfigRepository configRepository = null ; if (ConfigFileFormat.isPropertiesCompatible(format) && format != ConfigFileFormat.Properties) { configRepository = createPropertiesCompatibleFileConfigRepository(namespace, format); } else { configRepository = createConfigRepository(namespace); } logger.debug("Created a configuration repository of type [{}] for namespace [{}]" , configRepository.getClass().getName(), namespace); return this .createRepositoryConfig(namespace, configRepository); }
我们就以 properties
配置类型为例,会调用 DefaultConfigFactory.createConfigRepository
创建 ConfigRepository
:
1 2 3 4 5 6 7 8 ConfigRepository createConfigRepository (String namespace) { if (m_configUtil.isPropertyFileCacheEnabled()) { return createLocalConfigRepository(namespace); } return createRemoteConfigRepository(namespace); }
2.3 Apollo ConfigRepository 分层设计 Apollo ConfigRepository 适用于加载配置的接口,默认有两种实现:
LocalFileConfigRepository:从本地文件中加载配置。
RemoteConfigRepository:从远端Apollo Server加载配置。
在调用 DefaultConfigFactory#createConfigRepository
创建 ConfigRepository
时默认会创建多级对象,创建时的顺序为:RemoteConfigRepository
–> LocalFileConfigRepository
–> DefaultConfig
其中 DefaultConfig
持有 LocalFileConfigRepository
,LocalFileConfigRepository
持有 RemoteConfigRepository
。
DefaultConfig
监听 LocalFileConfigRepository
变化,LocalFileConfigRepository
监听 RemoteConfigRepository
变化。
创建流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ConfigRepository createConfigRepository (String namespace) { if (m_configUtil.isPropertyFileCacheEnabled()) { return createLocalConfigRepository(namespace); } return createRemoteConfigRepository(namespace); } LocalFileConfigRepository createLocalConfigRepository (String namespace) { if (m_configUtil.isInLocalMode()) { logger.warn( "==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====" , namespace); return new LocalFileConfigRepository (namespace); } return new LocalFileConfigRepository (namespace, createRemoteConfigRepository(namespace)); } RemoteConfigRepository createRemoteConfigRepository (String namespace) { return new RemoteConfigRepository (namespace); }
Apollo 通过多层 ConfigRepository 设计实现如下配置加载机制,既保证了配置的实时性,又保证了Apollo Server出现故障时对接入的服务影响最小:
客户端和服务端保持了一个长连接(通过Http Long Polling实现),从而能第一时间获得配置更新的推送(RemoteConfigRepository)
客户端还会定时从Apollo配置中心服务端拉取应用的最新配置。
这是一个fallback机制,为了防止推送机制失效导致配置不更新。客户端定时拉取会上报本地版本,所以一般情况下,对于定时拉取的操作,服务端都会返回304 - Not Modified
定时频率默认为每5分钟拉取一次,客户端也可以通过在运行时指定System Property:apollo.refreshInterval来覆盖,单位为分钟
客户端会把从服务端获取到的配置在本地文件系统缓存一份在遇到服务不可用,或网络不通的时候,依然能从本地恢复配置(LocalFileConfigRepository)
客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中(DefaultConfig)
2.4.1 RemoteConfigRepository RemoteConfigRepository
实现 AbstractConfigRepository
抽象类,远程配置Repository。实现从Apollo Server拉取配置,并缓存在内存中。定时 + 实时刷新缓存:
构造方法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class RemoteConfigRepository extends AbstractConfigRepository { private static final Logger logger = DeferredLoggerFactory.getLogger(RemoteConfigRepository.class); private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR); private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&" ).withKeyValueSeparator("=" ); private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper(); private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper(); private final ConfigServiceLocator m_serviceLocator; private final HttpClient m_httpClient; private final ConfigUtil m_configUtil; private final RemoteConfigLongPollService remoteConfigLongPollService; private volatile AtomicReference<ApolloConfig> m_configCache; private final String m_namespace; private final static ScheduledExecutorService m_executorService; private final AtomicReference<ServiceDTO> m_longPollServiceDto; private final AtomicReference<ApolloNotificationMessages> m_remoteMessages; private final RateLimiter m_loadConfigRateLimiter; private final AtomicBoolean m_configNeedForceRefresh; private final SchedulePolicy m_loadConfigFailSchedulePolicy; private static final Gson GSON = new Gson (); static { m_executorService = Executors.newScheduledThreadPool(1 , ApolloThreadFactory.create("RemoteConfigRepository" , true )); } public RemoteConfigRepository (String namespace) { m_namespace = namespace; m_configCache = new AtomicReference <>(); m_configUtil = ApolloInjector.getInstance(ConfigUtil.class); m_httpClient = ApolloInjector.getInstance(HttpClient.class); m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class); remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class); m_longPollServiceDto = new AtomicReference <>(); m_remoteMessages = new AtomicReference <>(); m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS()); m_configNeedForceRefresh = new AtomicBoolean (true ); m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy (m_configUtil.getOnErrorRetryInterval(), m_configUtil.getOnErrorRetryInterval() * 8 ); this .trySync(); this .schedulePeriodicRefresh(); this .scheduleLongPollingRefresh(); } }
RemoteConfigRepository
构造方法中分别调用了 trySync()
尝试同步配置,schedulePeriodicRefresh()
初始化定时刷新配置的任务,scheduleLongPollingRefresh()
注册自己到 RemoteConfigLongPollService
中实现配置更新的实时通知。
trySync() :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public abstract class AbstractConfigRepository implements ConfigRepository { protected boolean trySync () { try { sync(); return true ; } catch (Throwable ex) { Tracer.logEvent("ApolloConfigException" , ExceptionUtil.getDetailMessage(ex)); logger .warn("Sync config failed, will retry. Repository {}, reason: {}" , this .getClass(), ExceptionUtil .getDetailMessage(ex)); } return false ; } }
RemoteConfigRepository
构造方法中调用的 trySync
方法,最终会调用实现类的自己的 sync
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override protected synchronized void sync () { Transaction transaction = Tracer.newTransaction("Apollo.ConfigService" , "syncRemoteConfig" ); try { ApolloConfig previous = m_configCache.get(); ApolloConfig current = loadApolloConfig(); if (previous != current) { logger.debug("Remote Config refreshed!" ); m_configCache.set(current); this .fireRepositoryChange(m_namespace, this .getConfig()); } if (current != null ) { Tracer.logEvent(String.format("Apollo.Client.Configs.%s" , current.getNamespaceName()), current.getReleaseKey()); } transaction.setStatus(Transaction.SUCCESS); } catch (Throwable ex) { transaction.setStatus(ex); throw ex; } finally { transaction.complete(); } }
调用 loadApolloConfig()
方法加载远端配置信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 private ApolloConfig loadApolloConfig () { if (!m_loadConfigRateLimiter.tryAcquire(5 , TimeUnit.SECONDS)) { try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { } } String appId = m_configUtil.getAppId(); String cluster = m_configUtil.getCluster(); String dataCenter = m_configUtil.getDataCenter(); String secret = m_configUtil.getAccessKeySecret(); Tracer.logEvent("Apollo.Client.ConfigMeta" , STRING_JOINER.join(appId, cluster, m_namespace)); int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1 ; long onErrorSleepTime = 0 ; Throwable exception = null ; List<ServiceDTO> configServices = getConfigServices(); String url = null ; retryLoopLabel: for (int i = 0 ; i < maxRetries; i++) { List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices); Collections.shuffle(randomConfigServices); if (m_longPollServiceDto.get() != null ) { randomConfigServices.add(0 , m_longPollServiceDto.getAndSet(null )); } for (ServiceDTO configService : randomConfigServices) { if (onErrorSleepTime > 0 ) { logger.warn( "Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}" , onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace); try { m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime); } catch (InterruptedException e) { } } url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace, dataCenter, m_remoteMessages.get(), m_configCache.get()); logger.debug("Loading config from {}" , url); HttpRequest request = new HttpRequest (url); if (!StringUtils.isBlank(secret)) { Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret); request.setHeaders(headers); } Transaction transaction = Tracer.newTransaction("Apollo.ConfigService" , "queryConfig" ); transaction.addData("Url" , url); try { HttpResponse<ApolloConfig> response = m_httpClient.doGet(request, ApolloConfig.class); m_configNeedForceRefresh.set(false ); m_loadConfigFailSchedulePolicy.success(); transaction.addData("StatusCode" , response.getStatusCode()); transaction.setStatus(Transaction.SUCCESS); if (response.getStatusCode() == 304 ) { logger.debug("Config server responds with 304 HTTP status code." ); return m_configCache.get(); } ApolloConfig result = response.getBody(); logger.debug("Loaded config for {}: {}" , m_namespace, result); return result; } catch (ApolloConfigStatusCodeException ex) { ApolloConfigStatusCodeException statusCodeException = ex; if (ex.getStatusCode() == 404 ) { String message = String.format( "Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " + "please check whether the configs are released in Apollo!" , appId, cluster, m_namespace); statusCodeException = new ApolloConfigStatusCodeException (ex.getStatusCode(), message); } Tracer.logEvent("ApolloConfigException" , ExceptionUtil.getDetailMessage(statusCodeException)); transaction.setStatus(statusCodeException); exception = statusCodeException; if (ex.getStatusCode() == 404 ) { break retryLoopLabel; } } catch (Throwable ex) { Tracer.logEvent("ApolloConfigException" , ExceptionUtil.getDetailMessage(ex)); transaction.setStatus(ex); exception = ex; } finally { transaction.complete(); } onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() : m_loadConfigFailSchedulePolicy.fail(); } } String message = String.format( "Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s" , appId, cluster, m_namespace, url); throw new ApolloConfigException (message, exception); }
如果配置发生变更,回调 LocalFileConfigRepository.onRepositoryChange
方法,从而将最新配置同步到 LocalFileConfigRepository
。而 LocalFileConfigRepository
在更新完本地文件缓存配置后,同样会回调 DefaultConfig.onRepositoryChange
同步内存缓存,具体源码我们在后文分析。
schedulePeriodicRefresh :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void schedulePeriodicRefresh () { logger.debug("Schedule periodic refresh with interval: {} {}" , m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit()); m_executorService.scheduleAtFixedRate( new Runnable () { @Override public void run () { Tracer.logEvent("Apollo.ConfigService" , String.format("periodicRefresh: %s" , m_namespace)); logger.debug("refresh config for namespace: {}" , m_namespace); trySync(); Tracer.logEvent("Apollo.Client.Version" , Apollo.VERSION); } }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit()); }
**scheduleLongPollingRefresh()**:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void scheduleLongPollingRefresh () { remoteConfigLongPollService.submit(m_namespace, this ); } public void onLongPollNotified (ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) { m_longPollServiceDto.set(longPollNotifiedServiceDto); m_remoteMessages.set(remoteMessages); m_executorService.submit(new Runnable () { @Override public void run () { m_configNeedForceRefresh.set(true ); trySync(); } }); }
2.4.2 RemoteConfigLongPollService RemoteConfigLongPollService
远程配置长轮询服务。负责长轮询 Apollo Server 的配置变更通知 /notifications/v2
接口。当有新的通知时,触发 RemoteConfigRepository.onLongPollNotified
,立即轮询 Apollo Server 的配置读取/configs/{appId}/{clusterName}/{namespace:.+}
接口。
构造方法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public class RemoteConfigLongPollService { private static final Logger logger = LoggerFactory.getLogger(RemoteConfigLongPollService.class); private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR); private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&" ).withKeyValueSeparator("=" ); private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper(); private static final long INIT_NOTIFICATION_ID = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER; private static final int LONG_POLLING_READ_TIMEOUT = 90 * 1000 ; private final ExecutorService m_longPollingService; private final AtomicBoolean m_longPollingStopped; private SchedulePolicy m_longPollFailSchedulePolicyInSecond; private RateLimiter m_longPollRateLimiter; private final AtomicBoolean m_longPollStarted; private final Multimap<String, RemoteConfigRepository> m_longPollNamespaces; private final ConcurrentMap<String, Long> m_notifications; private final Map<String, ApolloNotificationMessages> m_remoteNotificationMessages; private Type m_responseType; private static final Gson GSON = new Gson (); private ConfigUtil m_configUtil; private HttpClient m_httpClient; private ConfigServiceLocator m_serviceLocator; private final ConfigServiceLoadBalancerClient configServiceLoadBalancerClient = ServiceBootstrap.loadPrimary( ConfigServiceLoadBalancerClient.class); public RemoteConfigLongPollService () { m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy (1 , 120 ); m_longPollingStopped = new AtomicBoolean (false ); m_longPollingService = Executors.newSingleThreadExecutor( ApolloThreadFactory.create("RemoteConfigLongPollService" , true )); m_longPollStarted = new AtomicBoolean (false ); m_longPollNamespaces = Multimaps.synchronizedSetMultimap(HashMultimap.<String, RemoteConfigRepository>create()); m_notifications = Maps.newConcurrentMap(); m_remoteNotificationMessages = Maps.newConcurrentMap(); m_responseType = new TypeToken <List<ApolloConfigNotification>>() { }.getType(); m_configUtil = ApolloInjector.getInstance(ConfigUtil.class); m_httpClient = ApolloInjector.getInstance(HttpClient.class); m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class); m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS()); } }
submit :
1 2 3 4 5 6 7 8 9 10 11 public boolean submit (String namespace, RemoteConfigRepository remoteConfigRepository) { boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository); m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID); if (!m_longPollStarted.get()) { startLongPolling(); } return added; }
startLongPolling :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private void startLongPolling () { if (!m_longPollStarted.compareAndSet(false , true )) { return ; } try { final String appId = m_configUtil.getAppId(); final String cluster = m_configUtil.getCluster(); final String dataCenter = m_configUtil.getDataCenter(); final String secret = m_configUtil.getAccessKeySecret(); final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills(); m_longPollingService.submit(new Runnable () { @Override public void run () { if (longPollingInitialDelayInMills > 0 ) { try { logger.debug("Long polling will start in {} ms." , longPollingInitialDelayInMills); TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills); } catch (InterruptedException e) { } } doLongPollingRefresh(appId, cluster, dataCenter, secret); } }); } catch (Throwable ex) { m_longPollStarted.set(false ); ApolloConfigException exception = new ApolloConfigException ("Schedule long polling refresh failed" , ex); Tracer.logError(exception); logger.warn(ExceptionUtil.getDetailMessage(exception)); } }
doLongPollingRefresh :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 private void doLongPollingRefresh (String appId, String cluster, String dataCenter, String secret) { ServiceDTO lastServiceDto = null ; while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) { if (!m_longPollRateLimiter.tryAcquire(5 , TimeUnit.SECONDS)) { try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { } } Transaction transaction = Tracer.newTransaction("Apollo.ConfigService" , "pollNotification" ); String url = null ; try { if (lastServiceDto == null ) { lastServiceDto = this .resolveConfigService(); } url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter, m_notifications); logger.debug("Long polling from {}" , url); HttpRequest request = new HttpRequest (url); request.setReadTimeout(LONG_POLLING_READ_TIMEOUT); if (!StringUtils.isBlank(secret)) { Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret); request.setHeaders(headers); } transaction.addData("Url" , url); final HttpResponse<List<ApolloConfigNotification>> response = m_httpClient.doGet(request, m_responseType); logger.debug("Long polling response: {}, url: {}" , response.getStatusCode(), url); if (response.getStatusCode() == 200 && response.getBody() != null ) { updateNotifications(response.getBody()); updateRemoteNotifications(response.getBody()); transaction.addData("Result" , response.getBody().toString()); notify(lastServiceDto, response.getBody()); } if (response.getStatusCode() == 304 && ThreadLocalRandom.current().nextBoolean()) { lastServiceDto = null ; } m_longPollFailSchedulePolicyInSecond.success(); transaction.addData("StatusCode" , response.getStatusCode()); transaction.setStatus(Transaction.SUCCESS); } catch (Throwable ex) { lastServiceDto = null ; Tracer.logEvent("ApolloConfigException" , ExceptionUtil.getDetailMessage(ex)); transaction.setStatus(ex); long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail(); logger.warn( "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}" , sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex)); try { TimeUnit.SECONDS.sleep(sleepTimeInSecond); } catch (InterruptedException ie) { } } finally { transaction.complete(); } } }
notify :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void notify (ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) { if (notifications == null || notifications.isEmpty()) { return ; } for (ApolloConfigNotification notification : notifications) { String namespaceName = notification.getNamespaceName(); List<RemoteConfigRepository> toBeNotified = Lists.newArrayList(m_longPollNamespaces.get(namespaceName)); ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName); ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone(); toBeNotified.addAll(m_longPollNamespaces .get(String.format("%s.%s" , namespaceName, ConfigFileFormat.Properties.getValue()))); for (RemoteConfigRepository remoteConfigRepository : toBeNotified) { try { remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages); } catch (Throwable ex) { Tracer.logError(ex); } } } }
至此 RemoteConfigRepository
从远端拉取配置的整个流程就已经分析完毕,Spring启动流程创建 RemoteConfigRepository
对象时会尝试第一次拉取namespace对应的配置,拉取完后会创建定时拉取任务和长轮询任务,长轮询任务调用 RemoteConfigLongPollService#startLongPolling
来实现,若服务端配置发生变更,则会回调 RemoteConfigRepository#onLongPollNotified
方法,在这个方法中会调用 RemoteConfigRepository#sync
方法重新拉取对应 namespace 的远端配置。
2.4.3 LocalFileConfigRepository 前文我们提到当服务端配置发生变更后,RemoteConfigRepository
会收到配置变更通知并调用 sync
方法同步配置,若配置发生变更,则会继续回调 LocalFileConfigRepository#onRepositoryChange
:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void onRepositoryChange (String namespace, Properties newProperties) { if (newProperties.equals(m_fileProperties)) { return ; } Properties newFileProperties = propertiesFactory.getPropertiesInstance(); newFileProperties.putAll(newProperties); updateFileProperties(newFileProperties, m_upstream.getSourceType()); this .fireRepositoryChange(namespace, newProperties); }
2.4.4 DefaultConfig 当 LocalFileConfigRepository
收到 RemoteConfigRepository
的配置变更通知并更新本地配置文件后,会继续回调 DefaultConfig#onRepositoryChange
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public synchronized void onRepositoryChange (String namespace, Properties newProperties) { if (newProperties.equals(m_configProperties.get())) { return ; } ConfigSourceType sourceType = m_configRepository.getSourceType(); Properties newConfigProperties = propertiesFactory.getPropertiesInstance(); newConfigProperties.putAll(newProperties); Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties, sourceType); if (actualChanges.isEmpty()) { return ; } this .fireConfigChange(m_namespace, actualChanges); Tracer.logEvent("Apollo.Client.ConfigChanges" , m_namespace); }
整体流程:
更新配置缓存,并计算实际发生变更的key,key为发生变更的配置key,value是发生变更的配置信息:
例如我们变更 test.hello
配置以及新增一个 test.hello3
配置:
发送属性变更通知,注意在这里就不像 Resporsitory
层发送的是整个仓库的变更事件,而发送的是某一个属性变更的事件。Repository配置变更事件监听是实现 RepositoryChangeListener
,属性变更事件监听是实现 ConfigChangeListener
三. Apollo如何实现Spring Bean配置属性的实时更新 在 SpringBoot 中使用 Apollo 客户端一般都需要启用 @EnableApolloConfig
注解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(ApolloConfigRegistrar.class) public @interface EnableApolloConfig { String[] value() default {ConfigConsts.NAMESPACE_APPLICATION}; int order () default Ordered.LOWEST_PRECEDENCE; }
@EnableApolloConfig
通过 @Import
注解注入了 ApolloConfigRegistrar
类,该类是Apollo组件注入的入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ApolloConfigRegistrar implements ImportBeanDefinitionRegistrar , EnvironmentAware { private final ApolloConfigRegistrarHelper helper = ServiceBootstrap.loadPrimary(ApolloConfigRegistrarHelper.class); @Override public void registerBeanDefinitions (AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { helper.registerBeanDefinitions(importingClassMetadata, registry); } @Override public void setEnvironment (Environment environment) { this .helper.setEnvironment(environment); } }
该类实现了两个扩展点:
EnvironmentAware:凡注册到Spring容器内的bean,实现了EnvironmentAware接口重写setEnvironment方法后,在工程启动时可以获得application.properties的配置文件配置的属性值。
ImportBeanDefinitionRegistrar:该扩展点作用是通过自定义的方式直接向容器中注册bean。实现ImportBeanDefinitionRegistrar接口,在重写的registerBeanDefinitions方法中定义的Bean,就和使用xml中定义Bean效果是一样的。ImportBeanDefinitionRegistrar是Spring框架提供的一种机制,允许通过api代码向容器批量注册BeanDefinition。它实现了BeanFactoryPostProcessor接口,可以在所有bean定义加载到容器之后,bean实例化之前,对bean定义进行修改。使用ImportBeanDefinitionRegistrar,我们可以向容器中批量导入bean,而不需要在配置文件中逐个配置。
ApolloConfigRegistrar#setEnvironment
将 Environment
暂存下来;ApolloConfigRegistrar#registerBeanDefinitions
中调用 ApolloConfigRegistrarHelper.registerBeanDefinitions
注册了一系列Spring扩展点实例至Ioc容器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public void registerBeanDefinitions (AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { AnnotationAttributes attributes = AnnotationAttributes .fromMap(importingClassMetadata.getAnnotationAttributes(EnableApolloConfig.class.getName())); final String[] namespaces = attributes.getStringArray("value" ); final int order = attributes.getNumber("order" ); final String[] resolvedNamespaces = this .resolveNamespaces(namespaces); PropertySourcesProcessor.addNamespaces(Lists.newArrayList(resolvedNamespaces), order); Map<String, Object> propertySourcesPlaceholderPropertyValues = new HashMap <>(); propertySourcesPlaceholderPropertyValues.put("order" , 0 ); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesPlaceholderConfigurer.class, propertySourcesPlaceholderPropertyValues); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, AutoUpdateConfigChangeListener.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesProcessor.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, ApolloAnnotationProcessor.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, SpringValueProcessor.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, SpringValueDefinitionProcessor.class); }
PropertySourcesProcessor
是 Apollo 最关键的组件之一,并且其实例化优先级也是最高的,PropertySourcesProcessor#postProcessBeanFactory()
会在该类实例化的时候被回调,该方法的处理如下:
1 2 3 4 5 6 7 8 9 10 @Override public void postProcessBeanFactory (ConfigurableListableBeanFactory beanFactory) throws BeansException { this .configUtil = ApolloInjector.getInstance(ConfigUtil.class); initializePropertySources(); initializeAutoUpdatePropertiesFeature(beanFactory); }
调用 PropertySourcesProcessor#initializePropertySources()
拉取远程 namespace 配置:
调用 PropertySourcesProcessor#initializeAutoUpdatePropertiesFeature()
给所有缓存在本地的 Config 配置添加监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void initializeAutoUpdatePropertiesFeature (ConfigurableListableBeanFactory beanFactory) { if (!AUTO_UPDATE_INITIALIZED_BEAN_FACTORIES.add(beanFactory)) { return ; } ConfigChangeListener configChangeEventPublisher = changeEvent -> applicationEventPublisher.publishEvent(new ApolloConfigChangeEvent (changeEvent)); List<ConfigPropertySource> configPropertySources = configPropertySourceFactory.getAllConfigPropertySources(); for (ConfigPropertySource configPropertySource : configPropertySources) { configPropertySource.addChangeListener(configChangeEventPublisher); } }
ConfigPropertySource#addChangeListener()
方法如下,在上文中分析过 ConfigPropertySource
包装类,我们知道这里的 this.source.addChangeListener(listener)
实际调用的是 DefaultConfig#addChangeListener()
方法。在上文中我们了解DefaultConfig
收到来自 LocalFileConfigRepository
配置变更后,会计算出具体的属性变更信息,并回调ConfigChangeListener#onChange
方法,而在这里的定义中,onChange
方法会发送一个 ApolloConfigChangeEvent
类型的Spring事件:
1 2 ConfigChangeListener configChangeEventPublisher = changeEvent -> applicationEventPublisher.publishEvent(new ApolloConfigChangeEvent (changeEvent));
在 DefaultApolloConfigRegistrarHelper#registerBeanDefinitions
会注册 AutoUpdateConfigChangeListener
Bean进入Ioc容器,而该监听器就是用于监听 ApolloConfigChangeEvent
事件,当属性发生变更调用 AutoUpdateConfigChangeListener#onChange
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public void onChange (ConfigChangeEvent changeEvent) { Set<String> keys = changeEvent.changedKeys(); if (CollectionUtils.isEmpty(keys)) { return ; } for (String key : keys) { Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key); if (targetValues == null || targetValues.isEmpty()) { continue ; } for (SpringValue val : targetValues) { updateSpringValue(val); } } }
onChange
方法会调用 updateSpringValue
更新对应Bean的属性值:
1 2 3 4 5 6 7 8 9 10 11 12 private void updateSpringValue (SpringValue springValue) { try { Object value = resolvePropertyValue(springValue); springValue.update(value); logger.info("Auto update apollo changed value successfully, new value: {}, {}" , value, springValue); } catch (Throwable ex) { logger.error("Auto update apollo changed value failed, {}" , springValue.toString(), ex); } }
首先调用 AutoUpdateConfigChangeListener#resolvePropertyValue()
方法借助 SpringBoot 的组件将 @Value 中配置的占位符替换为 PropertySource 中的对应 key 的属性值,此处涉及到 Spring 创建 Bean 对象时的属性注入机制,比较复杂,暂不作深入分析。
调用 SpringValue#update()
方法实际完成属性值的更新。
SpringValue#update()
方法其实就是使用反射机制运行时修改 Bean 对象中的成员变量,至此自动更新完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void update (Object newVal) throws IllegalAccessException, InvocationTargetException { if (isField()) { injectField(newVal); } else { injectMethod(newVal); } } private void injectField (Object newVal) throws IllegalAccessException { Object bean = beanRef.get(); if (bean == null ) { return ; } boolean accessible = field.isAccessible(); field.setAccessible(true ); field.set(bean, newVal); field.setAccessible(accessible); }
四. 总结 Apollo 启动时会在 ApolloApplicationContextInitializer
扩展点开始加载远端配置,而Apollo客户端获取配置采用多层设计 DefaultConfig
->LocalFileConfigRepository
->RemoteConfigRepository
,最终由 RemoteConfigRepository
完成远端配置拉取
每一层的作用各不一样:
RemoteConfigRepository
负责拉取远端配置并通知 LocalFileConfigRepository
更新配置。
LocalFileConfigRepository
负责将远端配置缓存至本地文件,设计这一层主要是为了在Apollo Server 不可用时保证业务服务的可用性。当 LocalFileConfigRepository
配置发生变更时负责通知 DefaultConfig
更新配置。
DefaultConfig
负责缓存Apollo配置信息在内存中,当 DefaultConfig
配置发生变更时,会回调 AutoUpdateConfigChangeListener#onChange
方法更新Java Bean 中的属性。
Apollo 客户端为了能够实时更新 Apollo Server 中的配置,使用下列手段来实现服务端配置变更的感知:
参考文章:
Apollo 客户端集成 SpringBoot 的源码分析(1)-启动时配置获取_spring 无法实例化apolloapplicationcontextinitializer的解决-CSDN博客
Apollo 客户端集成 SpringBoot 的源码分析(2)-配置属性的注入更新-CSDN博客
Spring Boot 启动生命周期分析,每个组件的执行时序,扩展点分析等【建议收藏】(持续更新,见到一个分析一个) - 掘金 (juejin.cn)
apollo client 自动更新深入解析 - 掘金 (juejin.cn)
Apollo核心源码解析(二):Apollo Client轮询配置(ConfigRepository与RemoteConfigLongPollService)、配置中心通用设计模型_apollo客户端和服务端保持长连接的源码-CSDN博客
SpringBoot快速入门-ImportBeanDefinitionRegistrar详解 – 编程技术之美-IT之美 (itzhimei.com)