0%

Kafka Distributed Connect启动过程

加载Plugin

调用Plugins构造函数初始化Plugins对象

1
Plugins plugins = new Plugins(workerProps);

在构造函数中会获取plugin.path的配置信息作为插件的加载目录

1
2
3
4
5
6
7
List<String> pluginLocations = WorkerConfig.pluginLocations(props);
public static List<String> pluginLocations(Map<String, String> props) {
String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
return locationList == null
? new ArrayList<String>()
: Arrays.asList(COMMA_WITH_WHITESPACE.split(locationList.trim(), -1));
}

接下来会用上面取得的路径信息初始化DelegatingClassLoader,并调用initLoaders方法加载Plugins

1
2
delegatingLoader = newDelegatingClassLoader(pluginLocations);
delegatingLoader.initLoaders();

在initLoaders中首先调用initPluginLoader来初始化PluginLoader,然后调用addAllAliases为Plugin添加别名

1
2
3
4
5
6
for (String configPath : pluginPaths) {
initPluginLoader(configPath);
}
// Finally add parent/system loader.
initPluginLoader(CLASSPATH_NAME);
addAllAliases();

在方法initPluginLoader中对非CLASSPATH_NAME路径首先调用registerPlugin初始化PluginClassLoader,后续处理逻辑与对CLASSPATH_NAME路径的处理逻辑相同。调用过程为通过scanUrlsAndAddPlugins->scanPluginPath->getPluginDesc和getServiceLoaderPluginDesc取得路径中的Connector, Converter, HeaderConverter, Transformation, ConfigProvider, ConnectRestExtension, ConnectorClientConfigOverridePolicy等类或接口的实现类,然后通过scanUrlsAndAddPlugins->loadJdbcDrivers加载JDBC驱动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private void scanUrlsAndAddPlugins(
ClassLoader loader,
URL[] urls,
Path pluginLocation
) throws InstantiationException, IllegalAccessException {
PluginScanResult plugins = scanPluginPath(loader, urls);
log.info("Registered loader: {}", loader);
if (!plugins.isEmpty()) {
......
}
//加载JDBC驱动
loadJdbcDrivers(loader);
}

private PluginScanResult scanPluginPath(
ClassLoader loader,
URL[] urls
) throws InstantiationException, IllegalAccessException {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.addUrls(urls);
builder.setScanners(new SubTypesScanner());
builder.useParallelExecutor();
Reflections reflections = new InternalReflections(builder);

return new PluginScanResult(
getPluginDesc(reflections, Connector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getPluginDesc(reflections, Transformation.class, loader),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
}

private <T> Collection<PluginDesc<T>> getPluginDesc(
Reflections reflections,
Class<T> klass,
ClassLoader loader
) throws InstantiationException, IllegalAccessException {
Set<Class<? extends T>> plugins;
try {
plugins = reflections.getSubTypesOf(klass);
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any classes for URLs: " +
reflections.getConfiguration().getUrls(), e);
return Collections.emptyList();
}
......
return result;
}

private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
Collection<PluginDesc<T>> result = new ArrayList<>();
try {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (T pluginImpl : serviceLoader) {
result.add(new PluginDesc<>((Class<? extends T>) pluginImpl.getClass(),
versionFor(pluginImpl), loader));
}
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
return result;
}

实例化Config

Config加载的逻辑相对简单的多,最终是通过ConfigDef.parse将传入的Map转换成匹配的键值对。

Cluster ID

ClusterId的获取过程是首先通过KafkaAdminClient.createInternal方法生成KafkaAdminClient对象

1
2
3
4
5
Admin adminClient = Admin.create(config.originals())

static Admin create(Map<String, Object> conf) {
return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null);
}

然后调用KafkaAdminClient.describeCluster().clusterId()方法取得ClusterId, 具体的逻辑是在处理Kafka响应的回调函数里面。

1
2
3
4
5
6
7
8
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
describeClusterFuture.complete(response.brokers());
controllerFuture.complete(controller(response));
clusterIdFuture.complete(response.clusterId());
authorizedOperationsFuture.complete(
validAclOperations(response.clusterAuthorizedOperations()));
}

Rest初始化

首先将Config传入RestServer的构造函数,根据配置信息取得hostname和port生成listener地址,取得配置中的adminListener地址

1
2
3
4
5
RestServer rest = new RestServer(config);

//RestServer构造函数
List<String> listeners = parseListeners();
List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);

其次生成Jetty Server对象和ContextHandler

1
2
jettyServer = new Server();
handlers = new ContextHandlerCollection();

然后根据listener和adminListener生成Connector对象添加到Jetty Server中。如果协议使用的是HTTPS,则会根据配置信息生成SSL/TLS,如果配置信息不包含SSL信息则生成默认的SSL/TLS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
createConnectors(listeners, adminListeners);

public Connector createConnector(String listener, boolean isAdmin) {
Matcher listenerMatcher = LISTENER_PATTERN.matcher(listener);

if (!listenerMatcher.matches())
throw new ConfigException("Listener doesn't have the right format (protocol://hostname:port).");

String protocol = listenerMatcher.group(1).toLowerCase(Locale.ENGLISH);

if (!PROTOCOL_HTTP.equals(protocol) && !PROTOCOL_HTTPS.equals(protocol))
throw new ConfigException(String.format("Listener protocol must be either \"%s\" or \"%s\".", PROTOCOL_HTTP, PROTOCOL_HTTPS));

String hostname = listenerMatcher.group(2);
int port = Integer.parseInt(listenerMatcher.group(3));

ServerConnector connector;

if (PROTOCOL_HTTPS.equals(protocol)) {
SslContextFactory ssl;
if (isAdmin) {
ssl = SSLUtils.createServerSideSslContextFactory(config, ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX);
} else {
ssl = SSLUtils.createServerSideSslContextFactory(config);
}
connector = new ServerConnector(jettyServer, ssl);
if (!isAdmin) {
connector.setName(String.format("%s_%s%d", PROTOCOL_HTTPS, hostname, port));
}
} else {
connector = new ServerConnector(jettyServer);
if (!isAdmin) {
connector.setName(String.format("%s_%s%d", PROTOCOL_HTTP, hostname, port));
}
}

if (isAdmin) {
connector.setName(ADMIN_SERVER_CONNECTOR_NAME);
}

if (!hostname.isEmpty())
connector.setHost(hostname);

connector.setPort(port);

return connector;
}

最后调用initializeServer设置静态资源处理器并启动Jetty Server.

配置Offset store

KafkaOffsetBackingStore使用topic来存储连接器消费的topic的偏移信息,存储偏移信息的topic的名称是以offset.storage.topic为key的配置项,默认为connect-offsets。KafkaOffsetBackingStore会创建Producer用于将偏移信息更新至topic中,同时也会创建Consumer用以获取其他连接器节点更新的偏移位置。KafkaOffsetBackingStore中用于存储最新offset的对象为名为data的HashMap

1
private HashMap<ByteBuffer, ByteBuffer> data;

接收更新偏移位置的回调函数为

1
2
3
4
5
6
7
8
private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
@Override
public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
data.put(key, value);
}
};

实例化Worker

Worker拥有节点上所有的Connector和Task实例,对Connector的创建、启动、停止等操作都是通过Worker来完成的。在Worker的构造函数中没有特别的逻辑,主要是持有一些外部对象的实例。

配置Status store

KafkaStatusBackingStore的逻辑跟Offset store的逻辑类似,默认存储status的topic名为connect-status, 它维护了Connector、Task和Topic的状态

1
2
3
protected final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
protected final Map<String, CacheEntry<ConnectorStatus>> connectors;
protected final ConcurrentMap<String, ConcurrentMap<String, TopicStatus>> topics;

回调函数为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() {
@Override
public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
read(record);
}
};

void read(ConsumerRecord<String, byte[]> record) {
String key = record.key();
if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
readConnectorStatus(key, record.value());
} else if (key.startsWith(TASK_STATUS_PREFIX)) {
readTaskStatus(key, record.value());
} else if (key.startsWith(TOPIC_STATUS_PREFIX)) {
readTopicStatus(key, record.value());
} else {
log.warn("Discarding record with invalid key {}", key);
}
}

配置Config store

KafkaConfigBackingStore的逻辑与status store的逻辑类似,默认topic名称为connect-configs,维护Connector和Task配置的变量为

1
2
3
private final Map<String, Integer> connectorTaskCounts = new HashMap<>();
private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();

它的回调函数要稍显复杂,当回调发生时KafkaConfigBackingStore首先会判断消息的内容,然后执行本地缓存的更新, 然后通过调用持有的DistributedHerder的ConfigUpdateListener对象的onXXX方法通知DistributedHerder执行相应的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
@Override
public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
if (error != null) {
log.error("Unexpected in consumer callback for KafkaConfigBackingStore: ", error);
return;
}

final SchemaAndValue value;
try {
value = converter.toConnectData(topic, record.value());
} catch (DataException e) {
log.error("Failed to convert config data to Kafka Connect format: ", e);
return;
}
// Make the recorded offset match the API used for positions in the consumer -- return the offset of the
// *next record*, not the last one consumed.
offset = record.offset() + 1;

if (record.key().startsWith(TARGET_STATE_PREFIX)) {
......
if (started && !removed)
updateListener.onConnectorTargetStateChange(connectorName);
} else if (record.key().startsWith(CONNECTOR_PREFIX)) {
......
if (started) {
if (removed)
updateListener.onConnectorConfigRemove(connectorName);
else
updateListener.onConnectorConfigUpdate(connectorName);
}
} else if (record.key().startsWith(TASK_PREFIX)) {
......
} else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
......
if (started)
updateListener.onTaskConfigUpdate(updatedTasks);
} else if (record.key().equals(SESSION_KEY_KEY)) {
......
if (started)
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);
} else {
log.error("Discarding config update record with invalid key: {}", record.key());
}
}

}

初始化Herder

在DistributedHerder构造函数中,会将前面创建的对象通过直接持有或者间接持有的方式传递给DistributedHerder对象,同时生成ConfigUpdateListener对象传递给configBackingStore,也就是上面提到的config update的逻辑。

1
configBackingStore.setUpdateListener(new ConfigUpdateListener());

启动Connect

在Connect的构造函数中会将Herder对象和RestServer对象传递进来. 当执行start方法时,首先会执行DistributedHerder的start方法,然后执行RestServer的initializeResources将Herder与RestServer绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void start() {
try {
log.info("Kafka Connect starting");
Exit.addShutdownHook("connect-shutdown-hook", shutdownHook);

herder.start();
rest.initializeResources(herder);

log.info("Kafka Connect started");
} finally {
startLatch.countDown();
}
}

由于DistributedHerder实现了Runnable,在start时通过submit方法将自己作为新的线程实例启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void start() {
this.herderExecutor.submit(this);
}

@Override
public void run() {
try {
log.info("Herder starting");

startServices();

log.info("Herder started");

while (!stopping.get()) {
tick();
}

halt();

log.info("Herder stopped");
herderMetrics.close();
} catch (Throwable t) {
log.error("Uncaught exception in herder work thread, exiting: ", t);
Exit.exit(1);
}
}

在startServices中会依次启动worker,status store,config store, 在worker的start中会启动offset store

1
2
3
4
5
protected void startServices() {
this.worker.start();
this.statusBackingStore.start();
this.configBackingStore.start();
}

在RestServer的initializeResources方法中会通过调用ResourceConfig的register方法将Rest请求路径”/“,”connector”及”connector-plugins”与herder绑定起来。

1
2
3
resourceConfig.register(new RootResource(herder));
resourceConfig.register(new ConnectorsResource(herder, config));
resourceConfig.register(new ConnectorPluginsResource(herder));

然后对RestServer日志、跨域等根据配置做一些特定设置,至此Kafka Connect started.