本文基于 apollo-client 2.1.0 版本源码进行分析
Apollo 是携程开源的配置中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性。
application (应用)
environment (环境)
cluster (集群)
namespace (命名空间)
一. SpringBoot集成Apollo 1.1 引入Apollo客户端依赖 1 2 3 4 5 <dependency > <groupId > com.ctrip.framework.apollo</groupId > <artifactId > apollo-client</artifactId > <version > 2.1.0</version > </dependency >
1.2 配置apollo 1 2 3 4 5 6 7 8 9 10 11 app: id: apollo-test apollo: meta: autoUpdateInjectedSpringProperties: true bootstrap: enabled: true namespaces: application.yaml eagerLoad: enabled: true
:在应用启动阶段,向Spring容器注入被托管的 application.properties
:将 Apollo 配置加载提到初始化日志系统之前。将Apollo配置加载提到初始化日志系统之前从1.2.0版本开始,如果希望把日志相关的配置(如 1ogging.level.root=info
或 1ogback-spring.xml
1.3 启动项目 启动项目后,我们更改 apollo 中的配置,SpringBoot中的配置会自动更新:
1 [Apollo-Config-1] c.f.a.s.p.AutoUpdateConfigChangeListener : Auto update apollo changed value successfully, new value: hahhahaha12311, key: test.hello, beanName: mongoController, field: cn.bigcoder.mongo.mongodemo.web.MongoController.hello
二. SpringBoot如何在启动时加载Apollo配置
2.1 ApolloApplicationContextInitializer spring.factories
文件 是 SpringBoot 中实现 SPI 机制的重要组成,在这个文件中可以定义SpringBoot各种扩展点的实现类。Apollo 客户端 与 SpringBoot 的集成就借助了这个机制,apollo-client
包中的 META-INF/spring.factories
1 2 3 4 5 6 org.springframework.boot.autoconfigure.EnableAutoConfiguration =\ com.ctrip.framework.apollo.spring.boot.ApolloAutoConfiguration org.springframework.context.ApplicationContextInitializer =\ com.ctrip.framework.apollo.spring.boot.ApolloApplicationContextInitializer org.springframework.boot.env.EnvironmentPostProcessor =\ com.ctrip.framework.apollo.spring.boot.ApolloApplicationContextInitializer
实现了 ApplicationContextInitializer
和 EnvironmentPostProcessor
两个扩展点,使得 apollo-client
能在Spring容器启动前从Apollo Server中加载配置。
:当我们想在Bean中使用配置属性时,那么我们的配置属性必须在Bean实例化之前就放入到Spring到Environment中。即我们的接口需要在 application context refreshed 之前进行调用,而 EnvironmentPostProcessor
:是Spring框架原有的概念,这个类的主要目的就是在 ConfigurableApplicationContext
类型(或者子类型)的 ApplicationContext
做refresh之前,允许我们对 ConfigurableApplicationContext
两者虽都实现在 Application Context 做 refresh 之前加载配置,但是 EnvironmentPostProcessor
的扩展点相比 ApplicationContextInitializer
更加靠前,使得 Apollo 配置加载能够提到初始化日志系统之前。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public void postProcessEnvironment (ConfigurableEnvironment configurableEnvironment, SpringApplication springApplication) { initializeSystemProperty(configurableEnvironment); Boolean eagerLoadEnabled = configurableEnvironment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_EAGER_LOAD_ENABLED, Boolean.class, false ); if (!eagerLoadEnabled) { return ; } Boolean bootstrapEnabled = configurableEnvironment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED, Boolean.class, false ); if (bootstrapEnabled) { DeferredLogger.enable(); initialize(configurableEnvironment); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void initialize (ConfigurableApplicationContext context) { ConfigurableEnvironment environment = context.getEnvironment(); if (!environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED, Boolean.class, false )) { logger.debug("Apollo bootstrap config is not enabled for context {}, see property: ${{}}" , context, PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED); return ; } logger.debug("Apollo bootstrap config is enabled for context {}" , context); initialize(environment); }
两个扩展点最终都会调用 ApolloApplicationContextInitializer#initialize(ConfigurableEnvironment environment)
方法初始化 apollo client,并加载远端配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 protected void initialize (ConfigurableEnvironment environment) { final ConfigUtil configUtil = ApolloInjector.getInstance(ConfigUtil.class); if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) { DeferredLogger.replayTo(); if (configUtil.isOverrideSystemProperties()) { PropertySourcesUtil.ensureBootstrapPropertyPrecedence(environment); } return ; } String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION); logger.debug("Apollo bootstrap namespaces: {}" , namespaces); List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces); CompositePropertySource composite; if (configUtil.isPropertyNamesCacheEnabled()) { composite = new CachedCompositePropertySource (PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME); } else { composite = new CompositePropertySource (PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME); } for (String namespace : namespaceList) { Config config = ConfigService.getConfig(namespace); composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config)); } if (!configUtil.isOverrideSystemProperties()) { if (environment.getPropertySources().contains(StandardEnvironment.SYSTEM_ENVIRONMENT_PROPERTY_SOURCE_NAME)) { environment.getPropertySources().addAfter(StandardEnvironment.SYSTEM_ENVIRONMENT_PROPERTY_SOURCE_NAME, composite); return ; } } environment.getPropertySources().addFirst(composite); }
因为有两个不同的触发点,所以该方法首先检查 Spring 的 Environment 环境中是否已经有了 key 为 ApolloBootstrapPropertySources
的目标属性,有的话就不必往下处理,直接 return。
从 Environment 环境中获取 apollo.bootstrap.namespaces
属性配置的启动命名空间字符串,如果没有的话就取默认的 application 命名空间。
按逗号分割处理配置的启动命名空间字符串,然后调用 ConfigService#getConfig()
创建 CompositePropertySource
复合属性源,因为 apollo-client 启动时可以加载多个命名空间的配置,每个命名空间对应一个 PropertySource
,而多个 PropertySource
就会被封装在 CompositePropertySource
对象中,若需要获取apollo中配置的属性时,就会遍历多个命名空间所对应的 PropertySource
,找到对应属性后就会直接返回,这也意味着,先加载的 namespace
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class CompositePropertySource extends EnumerablePropertySource <Object> { private final Set<PropertySource<?>> propertySources = new LinkedHashSet <>(); @Override @Nullable public Object getProperty (String name) { for (PropertySource<?> propertySource : this .propertySources) { Object candidate = propertySource.getProperty(name); if (candidate != null ) { return candidate; } } return null ; } }
调用 ConfigPropertySourceFactory#getConfigPropertySource()
缓存从远端拉取的配置,并将其包装为 PropertySource
,最终将所有拉取到的远端配置聚合到一个以 ApolloBootstrapPropertySources
为 key 的属性源包装类 CompositePropertySource
1 2 3 4 5 6 7 8 9 public ConfigPropertySource getConfigPropertySource (String name, Config source) { ConfigPropertySource configPropertySource = new ConfigPropertySource (name, source); configPropertySources.add(configPropertySource); return configPropertySource; }
将 CompositePropertySource
属性源包装类添加到 Spring 的 Environment 环境中,注意是插入在属性源列表的头部,因为取属性的时候其实是遍历这个属性源列表来查找,找到即返回,所以出现同名属性时,前面的优先级更高。这样在当本地配置文件和Apollo中配置了同名参数时会使得Apollo中的优先级更高。
2.2 从远端加载配置 在 ApolloApplicationContextInitializer.initialize
中会调用 ConfigService.getConfig()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static Config getConfig (String namespace) { return s_instance.getManager().getConfig(namespace); } private ConfigManager getManager () { if (m_configManager == null ) { synchronized (this ) { if (m_configManager == null ) { m_configManager = ApolloInjector.getInstance(ConfigManager.class); } } } return m_configManager; }
实际通过 ApolloInjector
去获取 ConfigManager
其实采用了 Java 中的 ServiceLoader
其实只有一个实现类,此处最终将调用到 DefaultConfigManager#getConfig()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public Config getConfig (String namespace) { Config config = m_configs.get(namespace); if (config == null ) { synchronized (this ) { config = m_configs.get(namespace); if (config == null ) { ConfigFactory factory = m_factoryManager.getFactory(namespace); config = factory.create(namespace); m_configs.put(namespace, config); } } }
首先从缓存中获取配置,缓存中没有则从远程拉取,注意此处在 synchronized 代码块内部也判了一次空,采用了双重检查锁机制。
远程拉取配置首先需要通过 ConfigFactoryManager#getFactory()
方法获取 ConfigFactory
,再通过 DefaultConfigFactory#create()
去获取 Apollo Server 中的配置。
在 DefaultConfigFactory#create()
中会根据加载namespace类型,创建对应的 ConfigRepository
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public Config create (String namespace) { ConfigFileFormat format = determineFileFormat(namespace); ConfigRepository configRepository = null ; if (ConfigFileFormat.isPropertiesCompatible(format) && format != ConfigFileFormat.Properties) { configRepository = createPropertiesCompatibleFileConfigRepository(namespace, format); } else { configRepository = createConfigRepository(namespace); } logger.debug("Created a configuration repository of type [{}] for namespace [{}]" , configRepository.getClass().getName(), namespace); return this .createRepositoryConfig(namespace, configRepository); }
我们就以 properties
配置类型为例,会调用 DefaultConfigFactory.createConfigRepository
创建 ConfigRepository
1 2 3 4 5 6 7 8 ConfigRepository createConfigRepository (String namespace) { if (m_configUtil.isPropertyFileCacheEnabled()) { return createLocalConfigRepository(namespace); } return createRemoteConfigRepository(namespace); }
2.3 Apollo ConfigRepository 分层设计 Apollo ConfigRepository 适用于加载配置的接口,默认有两种实现:
RemoteConfigRepository:从远端Apollo Server加载配置。
在调用 DefaultConfigFactory#createConfigRepository
创建 ConfigRepository
–> LocalFileConfigRepository
–> DefaultConfig
其中 DefaultConfig
持有 LocalFileConfigRepository
持有 RemoteConfigRepository
监听 LocalFileConfigRepository
监听 RemoteConfigRepository
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ConfigRepository createConfigRepository (String namespace) { if (m_configUtil.isPropertyFileCacheEnabled()) { return createLocalConfigRepository(namespace); } return createRemoteConfigRepository(namespace); } LocalFileConfigRepository createLocalConfigRepository (String namespace) { if (m_configUtil.isInLocalMode()) { logger.warn( "==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====" , namespace); return new LocalFileConfigRepository (namespace); } return new LocalFileConfigRepository (namespace, createRemoteConfigRepository(namespace)); } RemoteConfigRepository createRemoteConfigRepository (String namespace) { return new RemoteConfigRepository (namespace); }
Apollo 通过多层 ConfigRepository 设计实现如下配置加载机制,既保证了配置的实时性,又保证了Apollo Server出现故障时对接入的服务影响最小:
客户端和服务端保持了一个长连接(通过Http Long Polling实现),从而能第一时间获得配置更新的推送(RemoteConfigRepository)
这是一个fallback机制,为了防止推送机制失效导致配置不更新。客户端定时拉取会上报本地版本,所以一般情况下,对于定时拉取的操作,服务端都会返回304 - Not Modified
定时频率默认为每5分钟拉取一次,客户端也可以通过在运行时指定System Property:apollo.refreshInterval来覆盖,单位为分钟
2.4.1 RemoteConfigRepository RemoteConfigRepository
实现 AbstractConfigRepository
抽象类,远程配置Repository。实现从Apollo Server拉取配置,并缓存在内存中。定时 + 实时刷新缓存:
构造方法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class RemoteConfigRepository extends AbstractConfigRepository { private static final Logger logger = DeferredLoggerFactory.getLogger(RemoteConfigRepository.class); private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR); private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&" ).withKeyValueSeparator("=" ); private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper(); private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper(); private final ConfigServiceLocator m_serviceLocator; private final HttpClient m_httpClient; private final ConfigUtil m_configUtil; private final RemoteConfigLongPollService remoteConfigLongPollService; private volatile AtomicReference<ApolloConfig> m_configCache; private final String m_namespace; private final static ScheduledExecutorService m_executorService; private final AtomicReference<ServiceDTO> m_longPollServiceDto; private final AtomicReference<ApolloNotificationMessages> m_remoteMessages; private final RateLimiter m_loadConfigRateLimiter; private final AtomicBoolean m_configNeedForceRefresh; private final SchedulePolicy m_loadConfigFailSchedulePolicy; private static final Gson GSON = new Gson (); static { m_executorService = Executors.newScheduledThreadPool(1 , ApolloThreadFactory.create("RemoteConfigRepository" , true )); } public RemoteConfigRepository (String namespace) { m_namespace = namespace; m_configCache = new AtomicReference <>(); m_configUtil = ApolloInjector.getInstance(ConfigUtil.class); m_httpClient = ApolloInjector.getInstance(HttpClient.class); m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class); remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class); m_longPollServiceDto = new AtomicReference <>(); m_remoteMessages = new AtomicReference <>(); m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS()); m_configNeedForceRefresh = new AtomicBoolean (true ); m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy (m_configUtil.getOnErrorRetryInterval(), m_configUtil.getOnErrorRetryInterval() * 8 ); this .trySync(); this .schedulePeriodicRefresh(); this .scheduleLongPollingRefresh(); } }
构造方法中分别调用了 trySync()
注册自己到 RemoteConfigLongPollService
trySync() :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public abstract class AbstractConfigRepository implements ConfigRepository { protected boolean trySync () { try { sync(); return true ; } catch (Throwable ex) { Tracer.logEvent("ApolloConfigException" , ExceptionUtil.getDetailMessage(ex)); logger .warn("Sync config failed, will retry. Repository {}, reason: {}" , this .getClass(), ExceptionUtil .getDetailMessage(ex)); } return false ; } }
构造方法中调用的 trySync
方法,最终会调用实现类的自己的 sync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override protected synchronized void sync () { Transaction transaction = Tracer.newTransaction("Apollo.ConfigService" , "syncRemoteConfig" ); try { ApolloConfig previous = m_configCache.get(); ApolloConfig current = loadApolloConfig(); if (previous != current) { logger.debug("Remote Config refreshed!" ); m_configCache.set(current); this .fireRepositoryChange(m_namespace, this .getConfig()); } if (current != null ) { Tracer.logEvent(String.format("Apollo.Client.Configs.%s" , current.getNamespaceName()), current.getReleaseKey()); } transaction.setStatus(Transaction.SUCCESS); } catch (Throwable ex) { transaction.setStatus(ex); throw ex; } finally { transaction.complete(); } }
调用 loadApolloConfig()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 private ApolloConfig loadApolloConfig () { if (!m_loadConfigRateLimiter.tryAcquire(5 , TimeUnit.SECONDS)) { try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { } } String appId = m_configUtil.getAppId(); String cluster = m_configUtil.getCluster(); String dataCenter = m_configUtil.getDataCenter(); String secret = m_configUtil.getAccessKeySecret(); Tracer.logEvent("Apollo.Client.ConfigMeta" , STRING_JOINER.join(appId, cluster, m_namespace)); int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1 ; long onErrorSleepTime = 0 ; Throwable exception = null ; List<ServiceDTO> configServices = getConfigServices(); String url = null ; retryLoopLabel: for (int i = 0 ; i < maxRetries; i++) { List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices); Collections.shuffle(randomConfigServices); if (m_longPollServiceDto.get() != null ) { randomConfigServices.add(0 , m_longPollServiceDto.getAndSet(null )); } for (ServiceDTO configService : randomConfigServices) { if (onErrorSleepTime > 0 ) { logger.warn( "Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}" , onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace); try { m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime); } catch (InterruptedException e) { } } url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace, dataCenter, m_remoteMessages.get(), m_configCache.get()); logger.debug("Loading config from {}" , url); HttpRequest request = new HttpRequest (url); if (!StringUtils.isBlank(secret)) { Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret); request.setHeaders(headers); } Transaction transaction = Tracer.newTransaction("Apollo.ConfigService" , "queryConfig" ); transaction.addData("Url" , url); try { HttpResponse<ApolloConfig> response = m_httpClient.doGet(request, ApolloConfig.class); m_configNeedForceRefresh.set(false ); m_loadConfigFailSchedulePolicy.success(); transaction.addData("StatusCode" , response.getStatusCode()); transaction.setStatus(Transaction.SUCCESS); if (response.getStatusCode() == 304 ) { logger.debug("Config server responds with 304 HTTP status code." ); return m_configCache.get(); } ApolloConfig result = response.getBody(); logger.debug("Loaded config for {}: {}" , m_namespace, result); return result; } catch (ApolloConfigStatusCodeException ex) { ApolloConfigStatusCodeException statusCodeException = ex; if (ex.getStatusCode() == 404 ) { String message = String.format( "Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " + "please check whether the configs are released in Apollo!" , appId, cluster, m_namespace); statusCodeException = new ApolloConfigStatusCodeException (ex.getStatusCode(), message); } Tracer.logEvent("ApolloConfigException" , ExceptionUtil.getDetailMessage(statusCodeException)); transaction.setStatus(statusCodeException); exception = statusCodeException; if (ex.getStatusCode() == 404 ) { break retryLoopLabel; } } catch (Throwable ex) { Tracer.logEvent("ApolloConfigException" , ExceptionUtil.getDetailMessage(ex)); transaction.setStatus(ex); exception = ex; } finally { transaction.complete(); } onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() : m_loadConfigFailSchedulePolicy.fail(); } } String message = String.format( "Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s" , appId, cluster, m_namespace, url); throw new ApolloConfigException (message, exception); }
如果配置发生变更,回调 LocalFileConfigRepository.onRepositoryChange
方法,从而将最新配置同步到 LocalFileConfigRepository
。而 LocalFileConfigRepository
在更新完本地文件缓存配置后,同样会回调 DefaultConfig.onRepositoryChange
schedulePeriodicRefresh :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void schedulePeriodicRefresh () { logger.debug("Schedule periodic refresh with interval: {} {}" , m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit()); m_executorService.scheduleAtFixedRate( new Runnable () { @Override public void run () { Tracer.logEvent("Apollo.ConfigService" , String.format("periodicRefresh: %s" , m_namespace)); logger.debug("refresh config for namespace: {}" , m_namespace); trySync(); Tracer.logEvent("Apollo.Client.Version" , Apollo.VERSION); } }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void scheduleLongPollingRefresh () { remoteConfigLongPollService.submit(m_namespace, this ); } public void onLongPollNotified (ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) { m_longPollServiceDto.set(longPollNotifiedServiceDto); m_remoteMessages.set(remoteMessages); m_executorService.submit(new Runnable () { @Override public void run () { m_configNeedForceRefresh.set(true ); trySync(); } }); }
2.4.2 RemoteConfigLongPollService RemoteConfigLongPollService
远程配置长轮询服务。负责长轮询 Apollo Server 的配置变更通知 /notifications/v2
接口。当有新的通知时,触发 RemoteConfigRepository.onLongPollNotified
,立即轮询 Apollo Server 的配置读取/configs/{appId}/{clusterName}/{namespace:.+}
构造方法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public class RemoteConfigLongPollService { private static final Logger logger = LoggerFactory.getLogger(RemoteConfigLongPollService.class); private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR); private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&" ).withKeyValueSeparator("=" ); private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper(); private static final long INIT_NOTIFICATION_ID = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER; private static final int LONG_POLLING_READ_TIMEOUT = 90 * 1000 ; private final ExecutorService m_longPollingService; private final AtomicBoolean m_longPollingStopped; private SchedulePolicy m_longPollFailSchedulePolicyInSecond; private RateLimiter m_longPollRateLimiter; private final AtomicBoolean m_longPollStarted; private final Multimap<String, RemoteConfigRepository> m_longPollNamespaces; private final ConcurrentMap<String, Long> m_notifications; private final Map<String, ApolloNotificationMessages> m_remoteNotificationMessages; private Type m_responseType; private static final Gson GSON = new Gson (); private ConfigUtil m_configUtil; private HttpClient m_httpClient; private ConfigServiceLocator m_serviceLocator; private final ConfigServiceLoadBalancerClient configServiceLoadBalancerClient = ServiceBootstrap.loadPrimary( ConfigServiceLoadBalancerClient.class); public RemoteConfigLongPollService () { m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy (1 , 120 ); m_longPollingStopped = new AtomicBoolean (false ); m_longPollingService = Executors.newSingleThreadExecutor( ApolloThreadFactory.create("RemoteConfigLongPollService" , true )); m_longPollStarted = new AtomicBoolean (false ); m_longPollNamespaces = Multimaps.synchronizedSetMultimap(HashMultimap.<String, RemoteConfigRepository>create()); m_notifications = Maps.newConcurrentMap(); m_remoteNotificationMessages = Maps.newConcurrentMap(); m_responseType = new TypeToken <List<ApolloConfigNotification>>() { }.getType(); m_configUtil = ApolloInjector.getInstance(ConfigUtil.class); m_httpClient = ApolloInjector.getInstance(HttpClient.class); m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class); m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS()); } }
submit :
1 2 3 4 5 6 7 8 9 10 11 public boolean submit (String namespace, RemoteConfigRepository remoteConfigRepository) { boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository); m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID); if (!m_longPollStarted.get()) { startLongPolling(); } return added; }
startLongPolling :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private void startLongPolling () { if (!m_longPollStarted.compareAndSet(false , true )) { return ; } try { final String appId = m_configUtil.getAppId(); final String cluster = m_configUtil.getCluster(); final String dataCenter = m_configUtil.getDataCenter(); final String secret = m_configUtil.getAccessKeySecret(); final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills(); m_longPollingService.submit(new Runnable () { @Override public void run () { if (longPollingInitialDelayInMills > 0 ) { try { logger.debug("Long polling will start in {} ms." , longPollingInitialDelayInMills); TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills); } catch (InterruptedException e) { } } doLongPollingRefresh(appId, cluster, dataCenter, secret); } }); } catch (Throwable ex) { m_longPollStarted.set(false ); ApolloConfigException exception = new ApolloConfigException ("Schedule long polling refresh failed" , ex); Tracer.logError(exception); logger.warn(ExceptionUtil.getDetailMessage(exception)); } }
doLongPollingRefresh :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 private void doLongPollingRefresh (String appId, String cluster, String dataCenter, String secret) { ServiceDTO lastServiceDto = null ; while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) { if (!m_longPollRateLimiter.tryAcquire(5 , TimeUnit.SECONDS)) { try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { } } Transaction transaction = Tracer.newTransaction("Apollo.ConfigService" , "pollNotification" ); String url = null ; try { if (lastServiceDto == null ) { lastServiceDto = this .resolveConfigService(); } url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter, m_notifications); logger.debug("Long polling from {}" , url); HttpRequest request = new HttpRequest (url); request.setReadTimeout(LONG_POLLING_READ_TIMEOUT); if (!StringUtils.isBlank(secret)) { Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret); request.setHeaders(headers); } transaction.addData("Url" , url); final HttpResponse<List<ApolloConfigNotification>> response = m_httpClient.doGet(request, m_responseType); logger.debug("Long polling response: {}, url: {}" , response.getStatusCode(), url); if (response.getStatusCode() == 200 && response.getBody() != null ) { updateNotifications(response.getBody()); updateRemoteNotifications(response.getBody()); transaction.addData("Result" , response.getBody().toString()); notify(lastServiceDto, response.getBody()); } if (response.getStatusCode() == 304 && ThreadLocalRandom.current().nextBoolean()) { lastServiceDto = null ; } m_longPollFailSchedulePolicyInSecond.success(); transaction.addData("StatusCode" , response.getStatusCode()); transaction.setStatus(Transaction.SUCCESS); } catch (Throwable ex) { lastServiceDto = null ; Tracer.logEvent("ApolloConfigException" , ExceptionUtil.getDetailMessage(ex)); transaction.setStatus(ex); long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail(); logger.warn( "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}" , sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex)); try { TimeUnit.SECONDS.sleep(sleepTimeInSecond); } catch (InterruptedException ie) { } } finally { transaction.complete(); } } }
notify :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void notify (ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) { if (notifications == null || notifications.isEmpty()) { return ; } for (ApolloConfigNotification notification : notifications) { String namespaceName = notification.getNamespaceName(); List<RemoteConfigRepository> toBeNotified = Lists.newArrayList(m_longPollNamespaces.get(namespaceName)); ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName); ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone(); toBeNotified.addAll(m_longPollNamespaces .get(String.format("%s.%s" , namespaceName, ConfigFileFormat.Properties.getValue()))); for (RemoteConfigRepository remoteConfigRepository : toBeNotified) { try { remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages); } catch (Throwable ex) { Tracer.logError(ex); } } } }
至此 RemoteConfigRepository
从远端拉取配置的整个流程就已经分析完毕,Spring启动流程创建 RemoteConfigRepository
对象时会尝试第一次拉取namespace对应的配置,拉取完后会创建定时拉取任务和长轮询任务,长轮询任务调用 RemoteConfigLongPollService#startLongPolling
来实现,若服务端配置发生变更,则会回调 RemoteConfigRepository#onLongPollNotified
方法,在这个方法中会调用 RemoteConfigRepository#sync
方法重新拉取对应 namespace 的远端配置。
2.4.3 LocalFileConfigRepository 前文我们提到当服务端配置发生变更后,RemoteConfigRepository
会收到配置变更通知并调用 sync
方法同步配置,若配置发生变更,则会继续回调 LocalFileConfigRepository#onRepositoryChange
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void onRepositoryChange (String namespace, Properties newProperties) { if (newProperties.equals(m_fileProperties)) { return ; } Properties newFileProperties = propertiesFactory.getPropertiesInstance(); newFileProperties.putAll(newProperties); updateFileProperties(newFileProperties, m_upstream.getSourceType()); this .fireRepositoryChange(namespace, newProperties); }
2.4.4 DefaultConfig 当 LocalFileConfigRepository
收到 RemoteConfigRepository
的配置变更通知并更新本地配置文件后,会继续回调 DefaultConfig#onRepositoryChange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public synchronized void onRepositoryChange (String namespace, Properties newProperties) { if (newProperties.equals(m_configProperties.get())) { return ; } ConfigSourceType sourceType = m_configRepository.getSourceType(); Properties newConfigProperties = propertiesFactory.getPropertiesInstance(); newConfigProperties.putAll(newProperties); Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties, sourceType); if (actualChanges.isEmpty()) { return ; } this .fireConfigChange(m_namespace, actualChanges); Tracer.logEvent("Apollo.Client.ConfigChanges" , m_namespace); }
例如我们变更 test.hello
配置以及新增一个 test.hello3
发送属性变更通知,注意在这里就不像 Resporsitory
层发送的是整个仓库的变更事件,而发送的是某一个属性变更的事件。Repository配置变更事件监听是实现 RepositoryChangeListener
,属性变更事件监听是实现 ConfigChangeListener
三. Apollo如何实现Spring Bean配置属性的实时更新 在 SpringBoot 中使用 Apollo 客户端一般都需要启用 @EnableApolloConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(ApolloConfigRegistrar.class) public @interface EnableApolloConfig { String[] value() default {ConfigConsts.NAMESPACE_APPLICATION}; int order () default Ordered.LOWEST_PRECEDENCE; }
通过 @Import
注解注入了 ApolloConfigRegistrar
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ApolloConfigRegistrar implements ImportBeanDefinitionRegistrar , EnvironmentAware { private final ApolloConfigRegistrarHelper helper = ServiceBootstrap.loadPrimary(ApolloConfigRegistrarHelper.class); @Override public void registerBeanDefinitions (AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { helper.registerBeanDefinitions(importingClassMetadata, registry); } @Override public void setEnvironment (Environment environment) { this .helper.setEnvironment(environment); } }
将 Environment
中调用 ApolloConfigRegistrarHelper.registerBeanDefinitions
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public void registerBeanDefinitions (AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { AnnotationAttributes attributes = AnnotationAttributes .fromMap(importingClassMetadata.getAnnotationAttributes(EnableApolloConfig.class.getName())); final String[] namespaces = attributes.getStringArray("value" ); final int order = attributes.getNumber("order" ); final String[] resolvedNamespaces = this .resolveNamespaces(namespaces); PropertySourcesProcessor.addNamespaces(Lists.newArrayList(resolvedNamespaces), order); Map<String, Object> propertySourcesPlaceholderPropertyValues = new HashMap <>(); propertySourcesPlaceholderPropertyValues.put("order" , 0 ); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesPlaceholderConfigurer.class, propertySourcesPlaceholderPropertyValues); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, AutoUpdateConfigChangeListener.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesProcessor.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, ApolloAnnotationProcessor.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, SpringValueProcessor.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, SpringValueDefinitionProcessor.class); }
是 Apollo 最关键的组件之一,并且其实例化优先级也是最高的,PropertySourcesProcessor#postProcessBeanFactory()
1 2 3 4 5 6 7 8 9 10 @Override public void postProcessBeanFactory (ConfigurableListableBeanFactory beanFactory) throws BeansException { this .configUtil = ApolloInjector.getInstance(ConfigUtil.class); initializePropertySources(); initializeAutoUpdatePropertiesFeature(beanFactory); }
调用 PropertySourcesProcessor#initializePropertySources()
拉取远程 namespace 配置:
调用 PropertySourcesProcessor#initializeAutoUpdatePropertiesFeature()
给所有缓存在本地的 Config 配置添加监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void initializeAutoUpdatePropertiesFeature (ConfigurableListableBeanFactory beanFactory) { if (!AUTO_UPDATE_INITIALIZED_BEAN_FACTORIES.add(beanFactory)) { return ; } ConfigChangeListener configChangeEventPublisher = changeEvent -> applicationEventPublisher.publishEvent(new ApolloConfigChangeEvent (changeEvent)); List<ConfigPropertySource> configPropertySources = configPropertySourceFactory.getAllConfigPropertySources(); for (ConfigPropertySource configPropertySource : configPropertySources) { configPropertySource.addChangeListener(configChangeEventPublisher); } }
方法如下,在上文中分析过 ConfigPropertySource
包装类,我们知道这里的 this.source.addChangeListener(listener)
实际调用的是 DefaultConfig#addChangeListener()
收到来自 LocalFileConfigRepository
方法会发送一个 ApolloConfigChangeEvent
1 2 ConfigChangeListener configChangeEventPublisher = changeEvent -> applicationEventPublisher.publishEvent(new ApolloConfigChangeEvent (changeEvent));
在 DefaultApolloConfigRegistrarHelper#registerBeanDefinitions
会注册 AutoUpdateConfigChangeListener
Bean进入Ioc容器,而该监听器就是用于监听 ApolloConfigChangeEvent
事件,当属性发生变更调用 AutoUpdateConfigChangeListener#onChange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public void onChange (ConfigChangeEvent changeEvent) { Set<String> keys = changeEvent.changedKeys(); if (CollectionUtils.isEmpty(keys)) { return ; } for (String key : keys) { Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key); if (targetValues == null || targetValues.isEmpty()) { continue ; } for (SpringValue val : targetValues) { updateSpringValue(val); } } }
方法会调用 updateSpringValue
1 2 3 4 5 6 7 8 9 10 11 12 private void updateSpringValue (SpringValue springValue) { try { Object value = resolvePropertyValue(springValue); springValue.update(value); logger.info("Auto update apollo changed value successfully, new value: {}, {}" , value, springValue); } catch (Throwable ex) { logger.error("Auto update apollo changed value failed, {}" , springValue.toString(), ex); } }
首先调用 AutoUpdateConfigChangeListener#resolvePropertyValue()
方法借助 SpringBoot 的组件将 @Value 中配置的占位符替换为 PropertySource 中的对应 key 的属性值,此处涉及到 Spring 创建 Bean 对象时的属性注入机制,比较复杂,暂不作深入分析。
调用 SpringValue#update()
方法其实就是使用反射机制运行时修改 Bean 对象中的成员变量,至此自动更新完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void update (Object newVal) throws IllegalAccessException, InvocationTargetException { if (isField()) { injectField(newVal); } else { injectMethod(newVal); } } private void injectField (Object newVal) throws IllegalAccessException { Object bean = beanRef.get(); if (bean == null ) { return ; } boolean accessible = field.isAccessible(); field.setAccessible(true ); field.set(bean, newVal); field.setAccessible(accessible); }
四. 总结 Apollo 启动时会在 ApolloApplicationContextInitializer
扩展点开始加载远端配置,而Apollo客户端获取配置采用多层设计 DefaultConfig
,最终由 RemoteConfigRepository
负责拉取远端配置并通知 LocalFileConfigRepository
负责将远端配置缓存至本地文件,设计这一层主要是为了在Apollo Server 不可用时保证业务服务的可用性。当 LocalFileConfigRepository
配置发生变更时负责通知 DefaultConfig
负责缓存Apollo配置信息在内存中,当 DefaultConfig
配置发生变更时,会回调 AutoUpdateConfigChangeListener#onChange
方法更新Java Bean 中的属性。
Apollo 客户端为了能够实时更新 Apollo Server 中的配置,使用下列手段来实现服务端配置变更的感知:
