加载Plugin 调用Plugins构造函数初始化Plugins对象
1 Plugins plugins = new Plugins(workerProps);
在构造函数中会获取plugin.path的配置信息作为插件的加载目录
1 2 3 4 5 6 7 List<String> pluginLocations = WorkerConfig.pluginLocations(props); public static List<String> pluginLocations(Map<String, String> props) { String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG); return locationList == null ? new ArrayList<String>() : Arrays.asList(COMMA_WITH_WHITESPACE.split(locationList.trim(), -1)); }
接下来会用上面取得的路径信息初始化DelegatingClassLoader,并调用initLoaders方法加载Plugins
1 2 delegatingLoader = newDelegatingClassLoader(pluginLocations); delegatingLoader.initLoaders();
在initLoaders中首先调用initPluginLoader来初始化PluginLoader,然后调用addAllAliases为Plugin添加别名
1 2 3 4 5 6 for (String configPath : pluginPaths) { initPluginLoader(configPath); } // Finally add parent/system loader. initPluginLoader(CLASSPATH_NAME); addAllAliases();
在方法initPluginLoader中对非CLASSPATH_NAME路径首先调用registerPlugin初始化PluginClassLoader,后续处理逻辑与对CLASSPATH_NAME路径的处理逻辑相同。调用过程为通过scanUrlsAndAddPlugins->scanPluginPath->getPluginDesc和getServiceLoaderPluginDesc取得路径中的Connector, Converter, HeaderConverter, Transformation, ConfigProvider, ConnectRestExtension, ConnectorClientConfigOverridePolicy等类或接口的实现类,然后通过scanUrlsAndAddPlugins->loadJdbcDrivers加载JDBC驱动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 private void scanUrlsAndAddPlugins( ClassLoader loader, URL[] urls, Path pluginLocation ) throws InstantiationException, IllegalAccessException { PluginScanResult plugins = scanPluginPath(loader, urls); log.info("Registered loader: {}", loader); if (!plugins.isEmpty()) { ...... } //加载JDBC驱动 loadJdbcDrivers(loader); } private PluginScanResult scanPluginPath( ClassLoader loader, URL[] urls ) throws InstantiationException, IllegalAccessException { ConfigurationBuilder builder = new ConfigurationBuilder(); builder.setClassLoaders(new ClassLoader[]{loader}); builder.addUrls(urls); builder.setScanners(new SubTypesScanner()); builder.useParallelExecutor(); Reflections reflections = new InternalReflections(builder); return new PluginScanResult( getPluginDesc(reflections, Connector.class, loader), getPluginDesc(reflections, Converter.class, loader), getPluginDesc(reflections, HeaderConverter.class, loader), getPluginDesc(reflections, Transformation.class, loader), getServiceLoaderPluginDesc(ConfigProvider.class, loader), getServiceLoaderPluginDesc(ConnectRestExtension.class, loader), getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader) ); } private <T> Collection<PluginDesc<T>> getPluginDesc( Reflections reflections, Class<T> klass, ClassLoader loader ) throws InstantiationException, IllegalAccessException { Set<Class<? extends T>> plugins; try { plugins = reflections.getSubTypesOf(klass); } catch (ReflectionsException e) { log.debug("Reflections scanner could not find any classes for URLs: " + reflections.getConfiguration().getUrls(), e); return Collections.emptyList(); } ...... return result; } private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) { ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader); Collection<PluginDesc<T>> result = new ArrayList<>(); try { ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader); for (T pluginImpl : serviceLoader) { result.add(new PluginDesc<>((Class<? extends T>) pluginImpl.getClass(), versionFor(pluginImpl), loader)); } } finally { Plugins.compareAndSwapLoaders(savedLoader); } return result; }
实例化Config Config加载的逻辑相对简单的多,最终是通过ConfigDef.parse将传入的Map转换成匹配的键值对。
Cluster ID ClusterId的获取过程是首先通过KafkaAdminClient.createInternal方法生成KafkaAdminClient对象
1 2 3 4 5 Admin adminClient = Admin.create(config.originals()) static Admin create(Map<String, Object> conf) { return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null); }
然后调用KafkaAdminClient.describeCluster().clusterId()方法取得ClusterId, 具体的逻辑是在处理Kafka响应的回调函数里面。
1 2 3 4 5 6 7 8 void handleResponse(AbstractResponse abstractResponse) { MetadataResponse response = (MetadataResponse) abstractResponse; describeClusterFuture.complete(response.brokers()); controllerFuture.complete(controller(response)); clusterIdFuture.complete(response.clusterId()); authorizedOperationsFuture.complete( validAclOperations(response.clusterAuthorizedOperations())); }
Rest初始化 首先将Config传入RestServer的构造函数,根据配置信息取得hostname和port生成listener地址,取得配置中的adminListener地址
1 2 3 4 5 RestServer rest = new RestServer(config); //RestServer构造函数 List<String> listeners = parseListeners(); List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);
其次生成Jetty Server对象和ContextHandler
1 2 jettyServer = new Server(); handlers = new ContextHandlerCollection();
然后根据listener和adminListener生成Connector对象添加到Jetty Server中。如果协议使用的是HTTPS,则会根据配置信息生成SSL/TLS,如果配置信息不包含SSL信息则生成默认的SSL/TLS。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 createConnectors(listeners, adminListeners); public Connector createConnector(String listener, boolean isAdmin) { Matcher listenerMatcher = LISTENER_PATTERN.matcher(listener); if (!listenerMatcher.matches()) throw new ConfigException("Listener doesn't have the right format (protocol://hostname:port)."); String protocol = listenerMatcher.group(1).toLowerCase(Locale.ENGLISH); if (!PROTOCOL_HTTP.equals(protocol) && !PROTOCOL_HTTPS.equals(protocol)) throw new ConfigException(String.format("Listener protocol must be either \"%s\" or \"%s\".", PROTOCOL_HTTP, PROTOCOL_HTTPS)); String hostname = listenerMatcher.group(2); int port = Integer.parseInt(listenerMatcher.group(3)); ServerConnector connector; if (PROTOCOL_HTTPS.equals(protocol)) { SslContextFactory ssl; if (isAdmin) { ssl = SSLUtils.createServerSideSslContextFactory(config, ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX); } else { ssl = SSLUtils.createServerSideSslContextFactory(config); } connector = new ServerConnector(jettyServer, ssl); if (!isAdmin) { connector.setName(String.format("%s_%s%d", PROTOCOL_HTTPS, hostname, port)); } } else { connector = new ServerConnector(jettyServer); if (!isAdmin) { connector.setName(String.format("%s_%s%d", PROTOCOL_HTTP, hostname, port)); } } if (isAdmin) { connector.setName(ADMIN_SERVER_CONNECTOR_NAME); } if (!hostname.isEmpty()) connector.setHost(hostname); connector.setPort(port); return connector; }
最后调用initializeServer设置静态资源处理器并启动Jetty Server.
配置Offset store KafkaOffsetBackingStore使用topic来存储连接器消费的topic的偏移信息,存储偏移信息的topic的名称是以offset.storage.topic为key的配置项,默认为connect-offsets。KafkaOffsetBackingStore会创建Producer用于将偏移信息更新至topic中,同时也会创建Consumer用以获取其他连接器节点更新的偏移位置。KafkaOffsetBackingStore中用于存储最新offset的对象为名为data的HashMap
1 private HashMap<ByteBuffer, ByteBuffer> data;
接收更新偏移位置的回调函数为
1 2 3 4 5 6 7 8 private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() { @Override public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) { ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null; ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null; data.put(key, value); } };
实例化Worker Worker拥有节点上所有的Connector和Task实例,对Connector的创建、启动、停止等操作都是通过Worker来完成的。在Worker的构造函数中没有特别的逻辑,主要是持有一些外部对象的实例。
配置Status store KafkaStatusBackingStore的逻辑跟Offset store的逻辑类似,默认存储status的topic名为connect-status, 它维护了Connector、Task和Topic的状态
1 2 3 protected final Table<String, Integer, CacheEntry<TaskStatus>> tasks; protected final Map<String, CacheEntry<ConnectorStatus>> connectors; protected final ConcurrentMap<String, ConcurrentMap<String, TopicStatus>> topics;
回调函数为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() { @Override public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) { read(record); } }; void read(ConsumerRecord<String, byte[]> record) { String key = record.key(); if (key.startsWith(CONNECTOR_STATUS_PREFIX)) { readConnectorStatus(key, record.value()); } else if (key.startsWith(TASK_STATUS_PREFIX)) { readTaskStatus(key, record.value()); } else if (key.startsWith(TOPIC_STATUS_PREFIX)) { readTopicStatus(key, record.value()); } else { log.warn("Discarding record with invalid key {}", key); } }
配置Config store KafkaConfigBackingStore的逻辑与status store的逻辑类似,默认topic名称为connect-configs,维护Connector和Task配置的变量为
1 2 3 private final Map<String, Integer> connectorTaskCounts = new HashMap<>(); private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>(); private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
它的回调函数要稍显复杂,当回调发生时KafkaConfigBackingStore首先会判断消息的内容,然后执行本地缓存的更新, 然后通过调用持有的DistributedHerder的ConfigUpdateListener对象的onXXX方法通知DistributedHerder执行相应的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 private class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> { @Override public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) { if (error != null) { log.error("Unexpected in consumer callback for KafkaConfigBackingStore: ", error); return; } final SchemaAndValue value; try { value = converter.toConnectData(topic, record.value()); } catch (DataException e) { log.error("Failed to convert config data to Kafka Connect format: ", e); return; } // Make the recorded offset match the API used for positions in the consumer -- return the offset of the // *next record*, not the last one consumed. offset = record.offset() + 1; if (record.key().startsWith(TARGET_STATE_PREFIX)) { ...... if (started && !removed) updateListener.onConnectorTargetStateChange(connectorName); } else if (record.key().startsWith(CONNECTOR_PREFIX)) { ...... if (started) { if (removed) updateListener.onConnectorConfigRemove(connectorName); else updateListener.onConnectorConfigUpdate(connectorName); } } else if (record.key().startsWith(TASK_PREFIX)) { ...... } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) { ...... if (started) updateListener.onTaskConfigUpdate(updatedTasks); } else if (record.key().equals(SESSION_KEY_KEY)) { ...... if (started) updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); } else { log.error("Discarding config update record with invalid key: {}", record.key()); } } }
初始化Herder 在DistributedHerder构造函数中,会将前面创建的对象通过直接持有或者间接持有的方式传递给DistributedHerder对象,同时生成ConfigUpdateListener对象传递给configBackingStore,也就是上面提到的config update的逻辑。
1 configBackingStore.setUpdateListener(new ConfigUpdateListener());
启动Connect 在Connect的构造函数中会将Herder对象和RestServer对象传递进来. 当执行start方法时,首先会执行DistributedHerder的start方法,然后执行RestServer的initializeResources将Herder与RestServer绑定。
1 2 3 4 5 6 7 8 9 10 11 12 13 public void start() { try { log.info("Kafka Connect starting"); Exit.addShutdownHook("connect-shutdown-hook", shutdownHook); herder.start(); rest.initializeResources(herder); log.info("Kafka Connect started"); } finally { startLatch.countDown(); } }
由于DistributedHerder实现了Runnable,在start时通过submit方法将自己作为新的线程实例启动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public void start() { this.herderExecutor.submit(this); } @Override public void run() { try { log.info("Herder starting"); startServices(); log.info("Herder started"); while (!stopping.get()) { tick(); } halt(); log.info("Herder stopped"); herderMetrics.close(); } catch (Throwable t) { log.error("Uncaught exception in herder work thread, exiting: ", t); Exit.exit(1); } }
在startServices中会依次启动worker,status store,config store, 在worker的start中会启动offset store
1 2 3 4 5 protected void startServices() { this.worker.start(); this.statusBackingStore.start(); this.configBackingStore.start(); }
在RestServer的initializeResources方法中会通过调用ResourceConfig的register方法将Rest请求路径”/“,”connector”及”connector-plugins”与herder绑定起来。
1 2 3 resourceConfig.register(new RootResource(herder)); resourceConfig.register(new ConnectorsResource(herder, config)); resourceConfig.register(new ConnectorPluginsResource(herder));
然后对RestServer日志、跨域等根据配置做一些特定设置,至此Kafka Connect started.