0%

二进制方式安装

压缩包方式安装

1
2
3
4
5
6
7
8
9
10
11
wget https://github.com/Kitware/CMake/releases/download/v3.24.1/cmake-3.24.1-linux-x86_64.tar.gz
tar -zxvf cmake-3.24.1-linux-x86_64.tar.gz && cd cmake-3.24.1-linux-x86_64
sudo cp bin/* /usr/bin/
sudo cp -r doc/ /usr/share/
sudo cp -r share/aclocal /usr/share/
sudo cp -r share/applications /usr/share/
sudo cp -r share/bash-completion /usr/share/
sudo cp -r share/cmake-3.24 /usr/share/
sudo cp -r share/emacs /usr/share/
sudo cp -r share/icons /usr/share/
sudo cp -r share/mime /usr/share/

sh文件方式安装

sh安装方式与压缩包方式相似,只是在解压过程中会有一些交互

1
2
3
4
5
6
7
8
9
10
11
wget https://github.com/Kitware/CMake/releases/download/v3.24.1/cmake-3.24.1-linux-x86_64.sh
sh cmake-3.24.1-linux-x86_64.sh
sudo cp bin/* /usr/bin/
sudo cp -r doc/ /usr/share/
sudo cp -r share/aclocal /usr/share/
sudo cp -r share/applications /usr/share/
sudo cp -r share/bash-completion /usr/share/
sudo cp -r share/cmake-3.24 /usr/share/
sudo cp -r share/emacs /usr/share/
sudo cp -r share/icons /usr/share/
sudo cp -r share/mime /usr/share/

源码编译方式安装

1
2
3
wget https://github.com/Kitware/CMake/releases/download/v3.24.1/cmake-3.24.1.tar.gz
tar -zxvf cmake-3.24.1.tar.gz && cd cmake-3.24.1
./bootstrap && make && sudo make install

镜像源方式安装

1
2
3
4
wget -O - https://apt.kitware.com/keys/kitware-archive-latest.asc 2>/dev/null | gpg --dearmor - | sudo tee /usr/share/keyrings/kitware-archive-keyring.gpg >/dev/null
echo 'deb [signed-by=/usr/share/keyrings/kitware-archive-keyring.gpg] https://apt.kitware.com/ubuntu/ bionic main' | sudo tee /etc/apt/sources.list.d/kitware.list >/dev/null
sudo apt-get update
sudo apt-get install cmake

pip方式安装

1
pip install cmake

安装依赖

在编译之前需要依赖及编译器,参考官网Requirements

1
sudo apt install gcc make libreadline-dev zlib1g-dev openssl flex bison perl

如果需要生成文档,则需要额外安装以下依赖

1
apt-get install docbook-xml docbook-xsl fop libxml2-utils xsltproc

获取源码

1. 通过压缩包方式下载源码

打开[https://www.postgresql.org/ftp/source/],目前最新的版本为14.5,下载后解压

1
2
tar -zxvf postgresql-14.5.tar.gz
cd postgresql

2. 通过git方式拉取代码,并切换至稳定的分支

1
2
3
git clone http://git.postgresql.org/git/postgresql.git
cd postgresql
git checkout -b REL_14_STABLE origin/REL_14_STABLE

编译并安装

编译前需要指定安装目录,默认安装目录为/usr/local/pgsql/,可以通过设置编译参数–prefix指定安装目录,如${HOME}/pgsql。关于更多的编译参数可以参考官方文档

1
2
3
4
export PGHOME=${HOME}/pgsql
./configure --prefix=${PGHOME} --enable-tap-tests --enable-cassert --enable-debug
make
make install

设置环境变量

1
2
3
4
export PGDATA=${PGHOME}/pgdata
export PATH=${PGHOME}/bin:$PATH
export LD_LIBRARY_PATH=${PGHOME}/lib:$LD_LIBRARY_PATH
cd ${PGHOME}

初始化及启动

1
2
3
4
initdb -D $PGDATA
pg_ctl -D $PGDATA -l logfile start
createdb test
psql test

在Ubuntu系统上安装iptables

1
sudo apt-get install -y iptables

常用的参数选项

  • -A –append – Add a rule to a chain (at the end).
  • -D –delete – Remove specified rules from a chain.
  • -F –flush – Remove all rules.
  • -I –insert – Add a rule to a chain at a given position.
  • -L –list – Show all rules in a chain.
  • -N -new-chain – Create a new chain.
  • -v –verbose – Show more information when using a list option.
  • -n –numeric – numeric output of addresses and ports
  • –line-numbers – print line numbers when listing
  • -p –protocol – protocol: by number or name, eg. `tcp’
  • -s –source – source specification
  • -d –destination – destination specification
  • -j –jump – target for rule (may load target extension)

    显示已有的规则

    1
    sudo iptables -L -n --line-number

    添加一条规则

    以下命令新增一条DOCKER链路规则,允许172.20.0.3访问172.20.0.2的TCP 2022端口
    1
    sudo iptables -I DOCKER -s 172.20.0.3 -d 172.20.0.2 -p tcp --dport 2022 -j ACCEPT
    以下命令新增一条INPUT链路规则,禁止所有地址访问端口3306
    1
    sudo iptables -I INPUT -p tcp --dport 3306 -j DROP

    删除规则

    以下命令删除INPUT链路中序号为1的规则
    1
    sudo iptables -D INPUT 1

kubectl get ns knative-serving -o json > knative-serving.json

sudo vim knative-serving.json

curl -H “Authorization: Bearer $TOKEN” -H “Content-Type:application/json” -X PUT –data-binary @knative-serving.json https://127.0.0.1:6443/api/v1/namespaces/knative-serving/finalize –insecure

kubectl proxy –port=8081

curl -k -H “Content-Type:application/json” -X PUT –data-binary @knative-serving.json http://127.0.0.1:8080/api/v1/namespaces/knative-serving/finalize

curl -H “Content-Type:application/json” -H “Authorization: Bearer eyJhbGciOiJSUzI1NiIsImtpZCI6Ik84RHlkUjZBTHpwQ2JUT0N6ajlMUUtGYnJQV3dZeU9aVEsyaklIeHJPVncifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJhZG1pbi11c2VyLXRva2VuLWR3NHRmIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImFkbWluLXVzZXIiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiI5MTdhZTA4ZS04NWY3LTQ5ZjMtOTc2OC02NGM0NThlOTZiMTIiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06YWRtaW4tdXNlciJ9.qCD-BCP_eVXj_KfDrta2jfFniQCu06rRJsk5H0W1jpOt2XAIDe7uadAT_hQA74wmc278-ASLS_sq_7VZ_lV9Qpm-NGyUXI4n1aIEg-Axc1yZK422i4_DlEO18IB7pjADLYD9bfeYgxR3QJeIAiRN9T_qZdHoubIJQQ6XzhAqzQhmq9aQXOHcez67EBuajpqMWAfTchfriO8Z3YUWvgXprltp0NOUc9enhF0-F3ZirV4w9pJn7uHXCgUJjBBYM2rDbJzcHOfnWr_2OAS6uy5I85biuH8zoJ8Xh7qlP_1CKsYIOt7_cVZnrGGKMSVIGtmzDGiLBaZKVDrreP5gGm_1DQ” -X PUT –data-binary @knative-serving.json https://127.0.0.1:6443/api/v1/namespaces/knative-serving/finalize –insecure

curl -H “Authorization: Bearer eyJhbGciOiJSUzI1NiIsImtpZCI6Ik84RHlkUjZBTHpwQ2JUT0N6ajlMUUtGYnJQV3dZeU9aVEsyaklIeHJPVncifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJhZG1pbi11c2VyLXRva2VuLWR3NHRmIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImFkbWluLXVzZXIiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiI5MTdhZTA4ZS04NWY3LTQ5ZjMtOTc2OC02NGM0NThlOTZiMTIiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06YWRtaW4tdXNlciJ9.qCD-BCP_eVXj_KfDrta2jfFniQCu06rRJsk5H0W1jpOt2XAIDe7uadAT_hQA74wmc278-ASLS_sq_7VZ_lV9Qpm-NGyUXI4n1aIEg-Axc1yZK422i4_DlEO18IB7pjADLYD9bfeYgxR3QJeIAiRN9T_qZdHoubIJQQ6XzhAqzQhmq9aQXOHcez67EBuajpqMWAfTchfriO8Z3YUWvgXprltp0NOUc9enhF0-F3ZirV4w9pJn7uHXCgUJjBBYM2rDbJzcHOfnWr_2OAS6uy5I85biuH8zoJ8Xh7qlP_1CKsYIOt7_cVZnrGGKMSVIGtmzDGiLBaZKVDrreP5gGm_1DQ” https://127.0.0.1:6443/api/

kubectl describe secrets $(kubectl get secrets -n kube-system |grep admin |cut -f1 -d ‘ ‘) -n kube-system |grep -E ‘^token’ |cut -f2 -d’:’|tr -d ‘\t’|tr -d ‘ ‘

TOKEN=$(kubectl describe secrets $(kubectl get secrets -n kube-system |grep admin |cut -f1 -d ‘ ‘) -n kube-system |grep -E ‘^token’ |cut -f2 -d’:’|tr -d ‘\t’|tr -d ‘ ‘)

Remove any previous Go installation

1
rm -rf /usr/local/go

Download

1
wget https://golang.google.cn/dl/go1.18.linux-amd64.tar.gz

Extract the archive

1
sudo tar -zxvf go1.18.linux-amd64.tar.gz -C /usr/local

Create GOPATH

1
mkdir -p ~/workspace/go/bin ~/workspace/go/src ~/workspace/go/pkg

Setup go env

1
2
3
4
5
6
7
8
sudo tee -a $HOME/.bashrc <<-'EOF'
export GOROOT=/usr/local/go
export GOPATH=~/workspace/go
export GO111MODULE="on"
export GOPROXY=https://proxy.golang.com.cn,direct
export PATH=$PATH:$GOROOT/bin:$GOPATH/bin
EOF
source $HOME/.bashrc

Config all nodes environment

Letting iptables see bridged traffic

1
2
3
4
5
6
7
8
9
10
cat <<EOF | sudo tee /etc/modules-load.d/k8s.conf
br_netfilter
EOF

cat <<EOF | sudo tee /etc/sysctl.d/k8s.conf
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
net.ipv4.ip_forward=1
EOF
sudo sysctl --system

Turn off firewall

1
2
sudo systemctl stop ufw
sudo systemctl disable ufw

Turn off swap

1
2
sudo swapoff -a && sudo sysctl -w vm.swappiness=0
sudo sed -ri '/^[^#]*swap/s@^@#@' /etc/fstab

Install docker

Please refer How Install Docker and Docker Compose On Ubuntu

Change docker cgroup driver to “systemd”

1
sudo vi /etc/docker/daemon.json

Add content

1
2
3
{
"exec-opts": ["native.cgroupdriver=systemd"]
}

Restart docker

1
2
sudo systemctl restart docker
sudo systemctl enable docker

Install K8S

1
2
3
4
5
6
sudo curl -s https://mirrors.aliyun.com/kubernetes/apt/doc/apt-key.gpg | sudo apt-key add -
sudo tee /etc/apt/sources.list.d/kubernetes.list <<-'EOF'
deb https://mirrors.aliyun.com/kubernetes/apt kubernetes-xenial main
EOF
sudo apt-get update
sudo apt-get install -y kubeadm kubectl kubelet

Init K8S on master

1
sudo kubeadm init --image-repository registry.aliyuncs.com/google_containers --pod-network-cidr=10.244.0.0/16

Once K8S initialize successfully, you will see output like this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
To start using your cluster, you need to run the following as a regular user:

mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

Alternatively, if you are the root user, you can run:

export KUBECONFIG=/etc/kubernetes/admin.conf

You should now deploy a pod network to the cluster.
Run "kubectl apply -f [podnetwork].yaml" with one of the options listed at:
https://kubernetes.io/docs/concepts/cluster-administration/addons/

Then you can join any number of worker nodes by running the following on each as root:

kubeadm join 192.168.2.8:6443 --token 8borel.phprbzxwjupl3cuy \
--discovery-token-ca-cert-hash sha256:c441bbee5846711e3c00260e6daebc31258fcf8b9dbf7bb7b8b3b2c10f6ffbdf

Create new token with the following command

1
kubeadm token create --print-join-command

Copy config file to HOME folder

1
2
3
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

Install flannel

1
2
3
curl https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml -o kube-flannel.yaml

kubectl apply -f kube-flannel.yaml

Check node status

1
kubectl get nodes -o wide

Add nodes to cluster

Run the command on worker node

1
2
kubeadm join 192.168.2.8:6443 --token 8borel.phprbzxwjupl3cuy \
--discovery-token-ca-cert-hash sha256:c441bbee5846711e3c00260e6daebc31258fcf8b9dbf7bb7b8b3b2c10f6ffbdf

Config GPU node

If your worker nodes are GPU servers, you should install nvidia-docker2 on it

1
2
3
4
5
distribution=$(. /etc/os-release;echo $ID$VERSION_ID) \
&& curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add - \
&& curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list
sudo apt-get update
sudo apt-get install -y nvidia-docker2

Change container default runtime

1
sudo vi /etc/docker/daemon.json

Change default docker runtime to nvidia

1
2
3
4
5
6
7
8
9
10
{
"exec-opts": ["native.cgroupdriver=systemd"],
"default-runtime": "nvidia",
"runtimes": {
"nvidia": {
"path": "nvidia-container-runtime",
"runtimeArgs": []
}
}
}

Restart docker service

1
2
sudo systemctl daemon-reload
sudo systemctl restart docker

Install nvidia device plugin on master

1
2
curl https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/master/nvidia-device-plugin.yml -o nvidia-device-plugin.yml
kubectl apply -f nvidia-device-plugin.yml

Enable kubectl bash completion and alias

1
2
source <(kubectl completion bash)
alias k=kubectl

Install kubesphere

```
kubectl apply -f https://github.com/kubesphere/ks-installer/releases/download/v3.2.1/kubesphere-installer.yaml

kubectl apply -f https://github.com/kubesphere/ks-installer/releases/download/v3.2.1/cluster-configuration.yaml
``

OPCDA自动化接口是OPC基金会组织为了方便并统一OPCDA客户端开发而发布的一个接口协议集。自动化接口中共定义了6类对象:OPCServer对象、OPCBrowser对象、OPCGroups对象、OPCGroup对象、OPCItems对象、OPCItem对象, 接下来分别对这些对象的主要功能进行说明。

OPCServer对象

客户端可以通过OPCServer对象连接至OPC主机, 用于获取OPC主机信息、创建和操作Group与Item对象。OPCServer只有一个无参构造函数, 它的主要方法及使用如下:

GetOPCServers(object Node)

获取到OPC主机上的OPC服务列表, 参数为空时默认会获取本机服务列表

1
2
3
4
5
6
7
8
9
10
string Host = "192.168.2.53";
OPCServer LocalOpcServer = new OPCServer(); //OPCServer无参构造函数
//object ServerList = LocalOpcServer.GetOPCServers(); //获取本机服务列表
//object ServerList = LocalOpcServer.GetOPCServers(null); //获取本机服务列表
Array ServerList = LocalOpcServer.GetOPCServers(Host) as Array; //获取主机192.168.1.2上的OPC服务列表

foreach (string Server in ServerList)
{
Console.WriteLine("取得主机{0} OPC服务:{1}", Host, Server);
}

Connect(string ProgID, object Node)

连接主机Node上Program ID为ProgID的OPC服务

1
LocalOpcServer.Connect(ServerList.GetValue(1).ToString(), Host);

QueryAvailableLocaleIDs()

获取OPC主机区域ID列表

1
2
3
4
5
Array LocaleIDs = LocalOpcServer.QueryAvailableLocaleIDs() as Array;
for(int i = 1; i <= LocaleIDs.Length; i++)
{
Console.WriteLine($"QueryAvailableLocaleIDs result index[{i}] value={LocaleIDs.GetValue(i)}");
}

QueryAvailableProperties(string ItemID, out int Count, out Array PropertyIDs, out Array Descriptions, out Array DataTypes)

获取ItemID对应的属性列表

1
2
3
4
5
6
7
8
9
int Count;
Array PropertyIDs;
Array Descriptions;
Array DataTypes;
LocalOpcServer.QueryAvailableProperties("Random.Int4", out Count, out PropertyIDs, out Descriptions, out DataTypes);
for (int i = 1; i <= PropertyIDs.Length; i++)
{
Console.WriteLine($"QueryAvailableProperties result index[{i}] PropertyID={PropertyIDs.GetValue(i)} Description={Descriptions.GetValue(i)} DataType={Descriptions.GetValue(i)}");
}

GetItemProperties(string ItemID, int Count, ref Array PropertyIDs, out Array PropertyValues, out Array Errors)

根据属性列表查询对应ItemID属性值

1
2
3
4
5
6
7
8
9
10
List<int> InPropertyIDs = new List<int>();
InPropertyIDs.Add(0); //必须添加, 否则报错
InPropertyIDs.Add(4);
Array PropertyValues;
Array Errors;
LocalOpcServer.GetItemProperties("Random.Int4", 1, InPropertyIDs.ToArray(), out PropertyValues, out Errors);
for (int i = 1; i <= PropertyValues.Length; i++)
{
Console.WriteLine($"GetItemProperties result index[{i}] value={PropertyValues.GetValue(i)}");
}

GetErrorString(int ErrorCode)

根据ErrorCode获取文字描述

1
2
3
4
foreach (int error in Errors)
{
Console.WriteLine(LocalOpcServer.GetErrorString(error));
}

CreateBrowser()

创建OPCBrowser, 通过OPCBrowser对象可获得OPC服务器的节点信息

1
OPCBrowser LocalOPCbrowser = LocalOpcServer.CreateBrowser();

Disconnect()

断开与OPC主机的连接

1
LocalOpcServer.Disconnect();

完整示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
string Host = "192.168.2.53";
OPCServer LocalOpcServer = new OPCServer(); //OPCServer无参构造函数
//object ServerList = LocalOpcServer.GetOPCServers(); //获取本机服务列表
//object ServerList = LocalOpcServer.GetOPCServers(null); //获取本机服务列表
Array ServerList = LocalOpcServer.GetOPCServers(Host) as Array; //获取主机192.168.1.2上的OPC服务列表

foreach (string Server in ServerList)
{
Console.WriteLine("取得主机{0} OPC服务:{1}", Host, Server);
}

LocalOpcServer.Connect(ServerList.GetValue(1).ToString(), Host);

Array LocaleIDs = LocalOpcServer.QueryAvailableLocaleIDs() as Array;
for(int i = 1; i <= LocaleIDs.Length; i++)
{
Console.WriteLine($"QueryAvailableLocaleIDs result index[{i}] value={LocaleIDs.GetValue(i)}");
}

int Count;
Array PropertyIDs;
Array Descriptions;
Array DataTypes;
LocalOpcServer.QueryAvailableProperties("Random.Int4", out Count, out PropertyIDs, out Descriptions, out DataTypes);
for (int i = 1; i <= PropertyIDs.Length; i++)
{
Console.WriteLine($"QueryAvailableProperties result index[{i}] PropertyID={PropertyIDs.GetValue(i)} Description={Descriptions.GetValue(i)} DataType={Descriptions.GetValue(i)}");
}

List<int> InPropertyIDs = new List<int>();
InPropertyIDs.Add(0); //必须添加, 否则报错
InPropertyIDs.Add(4);
Array PropertyValues;
Array Errors;
LocalOpcServer.GetItemProperties("Random.Int4", 1, InPropertyIDs.ToArray(), out PropertyValues, out Errors);
for (int i = 1; i <= PropertyValues.Length; i++)
{
Console.WriteLine($"GetItemProperties result index[{i}] value={PropertyValues.GetValue(i)}");
}

// GetErrorString获取错误代码描述
foreach (int error in Errors)
{
Console.WriteLine(LocalOpcServer.GetErrorString(error));
}

OPCBrowser LocalOPCbrowser = LocalOpcServer.CreateBrowser();

LocalOpcServer.Disconnect();

OPCBrowser对象

OPCBrowser是OPC Server用来存储分支和节点信息的树形结构, 常用方法如下:

1
2
3
4
5
6
7
void ShowBranches() \\获取当前分支的子分支
void ShowLeafs(object Flat) \\获取叶子叶子节点, 当Flat=True时, 会以扁平的方式返回当前分支下的所有节点信息(包含子节点)
void MoveUp() \\向上移动一个层级
void MoveToRoot() \\移动至树顶(根)
void MoveDown(string Branch) \\向下移动至指定Branch分支
string GetItemID(string Leaf) \\获取节点ID
dynamic GetAccessPaths(string ItemID) \\获取节点的访问路径

构建树形结构的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
OPCBrowser LocalOPCbrowser = LocalOpcServer.CreateBrowser();
ShowOpcTagsRecurcive(LocalOPCbrowser, "");

//递归显示树形结构
private static void ShowOpcTagsRecurcive(OPCBrowser Browser, string prefix)
{
Browser.ShowBranches();

foreach (string item in Browser)
{
Console.WriteLine($"{prefix} branch {Browser.GetItemID(item)}");
Browser.MoveDown(item.ToString());
ShowOpcTagsRecurcive(Browser, prefix + "\t");
Browser.MoveUp();
}

Browser.ShowLeafs();

foreach (string item in Browser)
{

Console.WriteLine($"{prefix} leaf {Browser.GetItemID(item)}");
}

}

OPCGroups对象

OPCGroups是OPCGroup对象的集合, 通过OPCGroups可以获取、添加、删除OPCGroup对象, 另外还可以通过OPCGroups的属性对OPCGroup做一些默认设置。

常用方法

1
2
3
4
5
OPCGroup Item(object ItemSpecifier) //根据指定的参数获取OPCGroup
OPCGroup Add(object Name) //根据指定的名称添加OPCGroup
OPCGroup GetOPCGroup(object ItemSpecifier) //根据指定的参数获取OPCGroup
void RemoveAll() //移除所有OPCGroup
void Remove(object ItemSpecifier) //根据名称移除指定的OPCGroup

常用属性

1
2
3
4
DefaultGroupIsActive //默认是否激活
DefaultGroupUpdateRate //默认更新频率
DefaultGroupDeadBand //默认死区, 变化量超过死区后将会触发DataChange事件
DefaultGroupTimeBias //默认时间偏差

代码示例

1
2
3
4
5
6
7
8
9
10
11
OPCGroups LocalGroups = LocalOpcServer.OPCGroups;
LocalGroups.DefaultGroupIsActive = true;
LocalGroups.DefaultGroupDeadband = 0;
LocalGroups.DefaultGroupUpdateRate = 1000;
LocalGroups.DefaultGroupTimeBias = 5000;

OPCGroup t = LocalGroups.Add("Test");
OPCGroup test = LocalGroups.Item("Test");
OPCGroup Test = LocalGroups.GetOPCGroup("Test");
LocalGroups.Remove("Test");
LocalGroups.RemoveAll();

OPCGroup对象

OPCGroup对象是编程中最常用的对象之一, 它为客户端读写OPCItem提供了同步异步方法, 对于OPCGroups上设置的默认属性也提供了属性进行覆盖。由于对节点数据的读写涉及到OPCGroup、OPCItems和OPCItem, 所以示例代码将在介绍完OPCItem后一起提供。

常用方法

1
2
3
4
5
6
7
8
9
10
11
SyncRead(short Source, int NumItems, ref Array ServerHandles, out Array Values, out Array Errors, out object Qualities, out object TimeStamps) //同步读方法, 其中参数Source只能是OPCAutomation.OPCDataSource.OPCCache或OPCAutomation.OPCDataSource.OPCDevice

SyncWrite(int NumItems, ref Array ServerHandles, ref Array Values, out Array Errors) //同步写方法

AsyncRead(int NumItems, ref Array ServerHandles, out Array Errors, int TransactionID, out int CancelID) //异步读方法

AsyncWrite(int NumItems, ref Array ServerHandles, ref Array Values, out Array Errors, int TransactionID, out int CancelID) //异步写方法

AsyncRefresh(short Source, int TransactionID, out int CancelID) //异步刷新, 其中参数Source只能是OPCAutomation.OPCDataSource.OPCCache或OPCAutomation.OPCDataSource.OPCDevice

AsyncCancel(int CancelID) //异步取消

常用属性

1
2
3
4
5
6
7
8
9
10
Name //获取OPCGroup名称
IsPublic //是否为Public
IsActive //是否激活
IsSubscribed //是否订阅, 设置为true时才会触发DataChange事件
ClientHandle //客户端句柄
ServerHandle //服务端句柄
TimeBias //时间偏移, 对应OPCGroups对象的DefaultGroupDeadBand
DeadBand //死区, 对应OPCGroups对象的DefaultGroupDeadBand
UpdateRate //更新频率, 对应OPCGroups对象的DefaultGroupUpdateRate
OPCItems //获取OPCItems对象

事件

1
2
3
4
DataChange //数据变化事件, 当组内任何OPCItem数据或数据质量发生变化时会触发此事件
AsyncReadComplete //异步读结束事件, 对应AsyncRead方法
AsyncWriteComplete //异步写结束事件, 对应AsyncWrite方法
AsyncCancelComplete //异步取消结束事件, 对应AsyncCancel方法

OPCItems对象

OPCItems对象也是编程中最常用的对象之一, 它提供了添加和删除OPCItem对象的接口。

常用方法

1
2
3
4
5
6
7
8
9
Item(object ItemSpecifier) //根据名称获取OPCItem对象

OPCItem GetOPCItem(int ServerHandle) //根据ServerHandle获取OPCItem对象

OPCItem AddItem(string ItemID, int ClientHandle) //添加OPCItem对象

AddItems(int NumItems, ref Array ItemIDs, ref Array ClientHandles, out Array ServerHandles, out Array Errors, object RequestedDataTypes, object AccessPaths) //批量添加OPCItem对象

Remove(int NumItems, ref Array ServerHandles, out Array Errors) //批量移除OPCItem对象

常用属性

1
Count //获取OPCItems对象所拥有的OPCItem数量

OPCItem对象

OPCItem为服务端存储的数据的载体, 它包含节点的值, 数据通信质量及时间戳。

常用方法

1
2
Read(short Source, out object Value, out object Quality, out object TimeStamp) \\读取数据
void Write(object Value) //写值

常用属性

1
2
3
4
5
6
7
ClientHandle //客户端句柄
ServerHandle //服务端句柄
AccessPath //访问路径
ItemID //ID
Value //值
Quality //数据质量
TimeStamp //更新时间戳

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
OPCGroup LocalGroup = LocalGroups.Add("LocalGroups");
LocalGroup.IsActive = true;
LocalGroup.IsSubscribed = true;
LocalGroup.UpdateRate = 10000; //每十秒刷新一次
LocalGroup.DataChange += Group_DataChange; //事件
LocalGroup.AsyncWriteComplete += LocalGroup_AsyncWriteComplete; //数据变化事件
LocalGroup.AsyncReadComplete += LocalGroup_AsyncReadComplete; //异步读结束事件

ClientHandleItemIDPairs.Add(0, "");
ClientHandleItemIDPairs.Add(1, "Random.Int4");
ClientHandleItemIDPairs.Add(2, "Random.Int2");

try
{
Array ServerHandles;
Array Errors;
int CancelID;
//批量添加监控节点
LocalGroup.OPCItems.AddItems(2, ClientHandleItemIDPairs.Values.ToArray(), ClientHandleItemIDPairs.Keys.ToArray(), out ServerHandles, out Errors);
//添加单个监控节点
OPCItem SingleItem = LocalGroup.OPCItems.AddItem("Write Only.String", 3);
ClientHandleItemIDPairs.Add(3, "Write Only.String");

for (int i = 1; i <= ServerHandles.Length; i++)
{
Console.WriteLine($"Added Item: {LocalGroup.OPCItems.GetOPCItem((int)ServerHandles.GetValue(i)).ItemID}, Error: {LocalOpcServer.GetErrorString((int)Errors.GetValue(i))}");
}

Array outValues;
object outQualities;
object outTimestamps;



//同步读
LocalGroup.SyncRead((short)OPCAutomation.OPCDataSource.OPCDevice, ServerHandles.Length, ServerHandles, out outValues, out Errors, out outQualities, out outTimestamps);
for (int i = 1; i <= ServerHandles.Length; i++)
{

Console.WriteLine($"SyncRead Item {LocalGroup.OPCItems.GetOPCItem((int)ServerHandles.GetValue(i)).ItemID} Value: {outValues.GetValue(i)}, Quality: {(outQualities as Array).GetValue(i)}, Timestamp: {(outTimestamps as Array).GetValue(i)}, Error: {LocalOpcServer.GetErrorString((int)Errors.GetValue(i))}");
}

//此处添加的0和""是必须的,否则出错!
int[] WriteServerHandles = new[] { 0, SingleItem.ServerHandle };
object[] WriteValues = new object[] { "", "Hello" };
//同步写
LocalGroup.SyncWrite(1, WriteServerHandles.ToArray(), WriteValues, out Errors);

//异步读, TransactionID == 1
LocalGroup.AsyncRead(ServerHandles.Length, ServerHandles, out Errors, 1, out CancelID);
//异步写, TransactionID == 2
LocalGroup.AsyncWrite(1, WriteServerHandles.ToArray(), WriteValues, out Errors, 2, out CancelID);
//异步刷新, TransactionID == 3
LocalGroup.AsyncRefresh((short)OPCAutomation.OPCDataSource.OPCDevice, 3, out CancelID);
}
catch (Exception ex)
{

MessageBox.Show($"错误信息{ex.Message} {ex.StackTrace}", "错误");
}

private void LocalGroup_AsyncWriteComplete(int TransactionID, int NumItems, ref Array ClientHandles, ref Array Errors)
{
if (TransactionID == 2)
{
for (int i = 1; i <= NumItems; i++)
{
Console.WriteLine($"Async Write {ClientHandleItemIDPairs[(int)ClientHandles.GetValue(i)]} Complete, Error: {LocalOpcServer.GetErrorString((int)Errors.GetValue(i))}");
}
}
}
private void LocalGroup_AsyncReadComplete(int TransactionID, int NumItems, ref Array ClientHandles, ref Array ItemValues, ref Array Qualities, ref Array TimeStamps, ref Array Errors)
{
if (TransactionID == 1)
{
for (int i = 1; i <= NumItems; i++)
{
Console.WriteLine($"Async Read {ClientHandleItemIDPairs[(int)ClientHandles.GetValue(i)]} Complete, Value: {ItemValues.GetValue(i)}, Quality: {GetQualityString((int)Qualities.GetValue(i))}, TimeStamps: {((DateTime)TimeStamps.GetValue(i)).ToString("yyyy-MM-dd HH:mm:ss")}, Error: {LocalOpcServer.GetErrorString((int)Errors.GetValue(i))}");
}
}
}

private void Group_DataChange(int TransactionID, int NumItems, ref Array ClientHandles, ref Array ItemValues, ref Array Qualities, ref Array TimeStamps)
{
for (int i = 1; i <= NumItems; i++)
{
Console.WriteLine($"DataChange {ClientHandleItemIDPairs[(int)ClientHandles.GetValue(i)]} Complete, Value: {ItemValues.GetValue(i)}, Quality: {GetQualityString((int)Qualities.GetValue(i))}, TimeStamps: {((DateTime)TimeStamps.GetValue(i)).ToString("yyyy-MM-dd HH:mm:ss")}");
}
}

public static string GetQualityString(int QualityCode)
{
switch (QualityCode)
{
case 0: return "Bad";
case 1: return "Bad, Low Limited";
case 2: return "Bad, High Limited";
case 3: return "Bad, Constant";
case 4: return "Bad, Configuration Error";
case 5: return "Bad, Configuration Error, Low Limited";
case 6: return "Bad, Configuration Error, High Limited";
case 7: return "Bad, Configuration Error, Constant";
case 8: return "Bad, Not Connected";
case 9: return "Bad, Not Connected, Low Limited";
case 10: return "Bad, Not Connected, High Limited";
case 11: return "Bad, Not Connected, Constant";
case 12: return "Bad, Device Failure";
case 13: return "Bad, Device Failure, Low Limited";
case 14: return "Bad, Device Failure, High Limited";
case 15: return "Bad, Device Failure, Constant";
case 16: return "Bad, Sensor Failure";
case 17: return "Bad, Sensor Failure, Low Limited";
case 18: return "Bad, Sensor Failure, High Limited";
case 19: return "Bad, Sensor Failure, Constant";
case 20: return "Bad, Last Known Value";
case 21: return "Bad, Last Known Value, Low Limited";
case 22: return "Bad, Last Known Value, High Limited";
case 23: return "Bad, Last Known Value, Constant";
case 24: return "Bad, Comm Failure";
case 25: return "Bad, Comm Failure, Low Limited";
case 26: return "Bad, Comm Failure, High Limited";
case 27: return "Bad, Comm Failure, Constant";
case 28: return "Bad, Out of Service";
case 29: return "Bad, Out of Service, Low Limited";
case 30: return "Bad, Out of Service, High Limited";
case 31: return "Bad, Out of Service, Constant";
case 32: return "Bad, Waiting for Initial Data";
case 33: return "Bad, Waiting for Initial Data, Low Limited";
case 34: return "Bad, Waiting for Initial Data, High Limited";
case 35: return "Bad, Waiting for Initial Data, Constant";
case 64: return "Uncertain";
case 65: return "Uncertain, Low Limited";
case 66: return "Uncertain, High Limited";
case 67: return "Uncertain, Constant";
case 68: return "Uncertain, Last Usable Value";
case 69: return "Uncertain, Last Usable Value, Low Limited";
case 70: return "Uncertain, Last Usable Value, High Limited";
case 71: return "Uncertain, Last Usable Value, Constant";
case 80: return "Uncertain, Sensor Not Accurate";
case 81: return "Uncertain, Sensor Not Accurate, Low Limited";
case 82: return "Uncertain, Sensor Not Accurate, High Limited";
case 83: return "Uncertain, Sensor Not Accurate, Constant";
case 84: return "Uncertain, Engineering Units Exceeded";
case 85: return "Uncertain, Engineering Units Exceeded, Low Limited";
case 86: return "Uncertain, Engineering Units Exceeded, High Limited";
case 87: return "Uncertain, Engineering Units Exceeded, Constant";
case 88: return "Uncertain, Sub-Normal";
case 89: return "Uncertain, Sub-Normal, Low Limited";
case 90: return "Uncertain, Sub-Normal, High Limited";
case 91: return "Uncertain, Sub-Normal, Constant";
case 192: return "Good";
case 193: return "Good, Low Limited";
case 194: return "Good, High Limited";
case 195: return "Good, Constant";
case 216: return "Good, Local Override";
case 217: return "Good, Local Override, Low Limited";
case 218: return "Good, Local Override, High Limited";
case 219: return "Good, Local Override, Constant";
default:
return "INVALIDE QUALITY CODE!";
}
}

加载Plugin

调用Plugins构造函数初始化Plugins对象

1
Plugins plugins = new Plugins(workerProps);

在构造函数中会获取plugin.path的配置信息作为插件的加载目录

1
2
3
4
5
6
7
List<String> pluginLocations = WorkerConfig.pluginLocations(props);
public static List<String> pluginLocations(Map<String, String> props) {
String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
return locationList == null
? new ArrayList<String>()
: Arrays.asList(COMMA_WITH_WHITESPACE.split(locationList.trim(), -1));
}

接下来会用上面取得的路径信息初始化DelegatingClassLoader,并调用initLoaders方法加载Plugins

1
2
delegatingLoader = newDelegatingClassLoader(pluginLocations);
delegatingLoader.initLoaders();

在initLoaders中首先调用initPluginLoader来初始化PluginLoader,然后调用addAllAliases为Plugin添加别名

1
2
3
4
5
6
for (String configPath : pluginPaths) {
initPluginLoader(configPath);
}
// Finally add parent/system loader.
initPluginLoader(CLASSPATH_NAME);
addAllAliases();

在方法initPluginLoader中对非CLASSPATH_NAME路径首先调用registerPlugin初始化PluginClassLoader,后续处理逻辑与对CLASSPATH_NAME路径的处理逻辑相同。调用过程为通过scanUrlsAndAddPlugins->scanPluginPath->getPluginDesc和getServiceLoaderPluginDesc取得路径中的Connector, Converter, HeaderConverter, Transformation, ConfigProvider, ConnectRestExtension, ConnectorClientConfigOverridePolicy等类或接口的实现类,然后通过scanUrlsAndAddPlugins->loadJdbcDrivers加载JDBC驱动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private void scanUrlsAndAddPlugins(
ClassLoader loader,
URL[] urls,
Path pluginLocation
) throws InstantiationException, IllegalAccessException {
PluginScanResult plugins = scanPluginPath(loader, urls);
log.info("Registered loader: {}", loader);
if (!plugins.isEmpty()) {
......
}
//加载JDBC驱动
loadJdbcDrivers(loader);
}

private PluginScanResult scanPluginPath(
ClassLoader loader,
URL[] urls
) throws InstantiationException, IllegalAccessException {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.addUrls(urls);
builder.setScanners(new SubTypesScanner());
builder.useParallelExecutor();
Reflections reflections = new InternalReflections(builder);

return new PluginScanResult(
getPluginDesc(reflections, Connector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getPluginDesc(reflections, Transformation.class, loader),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
}

private <T> Collection<PluginDesc<T>> getPluginDesc(
Reflections reflections,
Class<T> klass,
ClassLoader loader
) throws InstantiationException, IllegalAccessException {
Set<Class<? extends T>> plugins;
try {
plugins = reflections.getSubTypesOf(klass);
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any classes for URLs: " +
reflections.getConfiguration().getUrls(), e);
return Collections.emptyList();
}
......
return result;
}

private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
Collection<PluginDesc<T>> result = new ArrayList<>();
try {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (T pluginImpl : serviceLoader) {
result.add(new PluginDesc<>((Class<? extends T>) pluginImpl.getClass(),
versionFor(pluginImpl), loader));
}
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
return result;
}

实例化Config

Config加载的逻辑相对简单的多,最终是通过ConfigDef.parse将传入的Map转换成匹配的键值对。

Cluster ID

ClusterId的获取过程是首先通过KafkaAdminClient.createInternal方法生成KafkaAdminClient对象

1
2
3
4
5
Admin adminClient = Admin.create(config.originals())

static Admin create(Map<String, Object> conf) {
return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null);
}

然后调用KafkaAdminClient.describeCluster().clusterId()方法取得ClusterId, 具体的逻辑是在处理Kafka响应的回调函数里面。

1
2
3
4
5
6
7
8
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
describeClusterFuture.complete(response.brokers());
controllerFuture.complete(controller(response));
clusterIdFuture.complete(response.clusterId());
authorizedOperationsFuture.complete(
validAclOperations(response.clusterAuthorizedOperations()));
}

Rest初始化

首先将Config传入RestServer的构造函数,根据配置信息取得hostname和port生成listener地址,取得配置中的adminListener地址

1
2
3
4
5
RestServer rest = new RestServer(config);

//RestServer构造函数
List<String> listeners = parseListeners();
List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);

其次生成Jetty Server对象和ContextHandler

1
2
jettyServer = new Server();
handlers = new ContextHandlerCollection();

然后根据listener和adminListener生成Connector对象添加到Jetty Server中。如果协议使用的是HTTPS,则会根据配置信息生成SSL/TLS,如果配置信息不包含SSL信息则生成默认的SSL/TLS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
createConnectors(listeners, adminListeners);

public Connector createConnector(String listener, boolean isAdmin) {
Matcher listenerMatcher = LISTENER_PATTERN.matcher(listener);

if (!listenerMatcher.matches())
throw new ConfigException("Listener doesn't have the right format (protocol://hostname:port).");

String protocol = listenerMatcher.group(1).toLowerCase(Locale.ENGLISH);

if (!PROTOCOL_HTTP.equals(protocol) && !PROTOCOL_HTTPS.equals(protocol))
throw new ConfigException(String.format("Listener protocol must be either \"%s\" or \"%s\".", PROTOCOL_HTTP, PROTOCOL_HTTPS));

String hostname = listenerMatcher.group(2);
int port = Integer.parseInt(listenerMatcher.group(3));

ServerConnector connector;

if (PROTOCOL_HTTPS.equals(protocol)) {
SslContextFactory ssl;
if (isAdmin) {
ssl = SSLUtils.createServerSideSslContextFactory(config, ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX);
} else {
ssl = SSLUtils.createServerSideSslContextFactory(config);
}
connector = new ServerConnector(jettyServer, ssl);
if (!isAdmin) {
connector.setName(String.format("%s_%s%d", PROTOCOL_HTTPS, hostname, port));
}
} else {
connector = new ServerConnector(jettyServer);
if (!isAdmin) {
connector.setName(String.format("%s_%s%d", PROTOCOL_HTTP, hostname, port));
}
}

if (isAdmin) {
connector.setName(ADMIN_SERVER_CONNECTOR_NAME);
}

if (!hostname.isEmpty())
connector.setHost(hostname);

connector.setPort(port);

return connector;
}

最后调用initializeServer设置静态资源处理器并启动Jetty Server.

配置Offset store

KafkaOffsetBackingStore使用topic来存储连接器消费的topic的偏移信息,存储偏移信息的topic的名称是以offset.storage.topic为key的配置项,默认为connect-offsets。KafkaOffsetBackingStore会创建Producer用于将偏移信息更新至topic中,同时也会创建Consumer用以获取其他连接器节点更新的偏移位置。KafkaOffsetBackingStore中用于存储最新offset的对象为名为data的HashMap

1
private HashMap<ByteBuffer, ByteBuffer> data;

接收更新偏移位置的回调函数为

1
2
3
4
5
6
7
8
private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
@Override
public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
data.put(key, value);
}
};

实例化Worker

Worker拥有节点上所有的Connector和Task实例,对Connector的创建、启动、停止等操作都是通过Worker来完成的。在Worker的构造函数中没有特别的逻辑,主要是持有一些外部对象的实例。

配置Status store

KafkaStatusBackingStore的逻辑跟Offset store的逻辑类似,默认存储status的topic名为connect-status, 它维护了Connector、Task和Topic的状态

1
2
3
protected final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
protected final Map<String, CacheEntry<ConnectorStatus>> connectors;
protected final ConcurrentMap<String, ConcurrentMap<String, TopicStatus>> topics;

回调函数为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() {
@Override
public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
read(record);
}
};

void read(ConsumerRecord<String, byte[]> record) {
String key = record.key();
if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
readConnectorStatus(key, record.value());
} else if (key.startsWith(TASK_STATUS_PREFIX)) {
readTaskStatus(key, record.value());
} else if (key.startsWith(TOPIC_STATUS_PREFIX)) {
readTopicStatus(key, record.value());
} else {
log.warn("Discarding record with invalid key {}", key);
}
}

配置Config store

KafkaConfigBackingStore的逻辑与status store的逻辑类似,默认topic名称为connect-configs,维护Connector和Task配置的变量为

1
2
3
private final Map<String, Integer> connectorTaskCounts = new HashMap<>();
private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();

它的回调函数要稍显复杂,当回调发生时KafkaConfigBackingStore首先会判断消息的内容,然后执行本地缓存的更新, 然后通过调用持有的DistributedHerder的ConfigUpdateListener对象的onXXX方法通知DistributedHerder执行相应的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
@Override
public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
if (error != null) {
log.error("Unexpected in consumer callback for KafkaConfigBackingStore: ", error);
return;
}

final SchemaAndValue value;
try {
value = converter.toConnectData(topic, record.value());
} catch (DataException e) {
log.error("Failed to convert config data to Kafka Connect format: ", e);
return;
}
// Make the recorded offset match the API used for positions in the consumer -- return the offset of the
// *next record*, not the last one consumed.
offset = record.offset() + 1;

if (record.key().startsWith(TARGET_STATE_PREFIX)) {
......
if (started && !removed)
updateListener.onConnectorTargetStateChange(connectorName);
} else if (record.key().startsWith(CONNECTOR_PREFIX)) {
......
if (started) {
if (removed)
updateListener.onConnectorConfigRemove(connectorName);
else
updateListener.onConnectorConfigUpdate(connectorName);
}
} else if (record.key().startsWith(TASK_PREFIX)) {
......
} else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
......
if (started)
updateListener.onTaskConfigUpdate(updatedTasks);
} else if (record.key().equals(SESSION_KEY_KEY)) {
......
if (started)
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);
} else {
log.error("Discarding config update record with invalid key: {}", record.key());
}
}

}

初始化Herder

在DistributedHerder构造函数中,会将前面创建的对象通过直接持有或者间接持有的方式传递给DistributedHerder对象,同时生成ConfigUpdateListener对象传递给configBackingStore,也就是上面提到的config update的逻辑。

1
configBackingStore.setUpdateListener(new ConfigUpdateListener());

启动Connect

在Connect的构造函数中会将Herder对象和RestServer对象传递进来. 当执行start方法时,首先会执行DistributedHerder的start方法,然后执行RestServer的initializeResources将Herder与RestServer绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void start() {
try {
log.info("Kafka Connect starting");
Exit.addShutdownHook("connect-shutdown-hook", shutdownHook);

herder.start();
rest.initializeResources(herder);

log.info("Kafka Connect started");
} finally {
startLatch.countDown();
}
}

由于DistributedHerder实现了Runnable,在start时通过submit方法将自己作为新的线程实例启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void start() {
this.herderExecutor.submit(this);
}

@Override
public void run() {
try {
log.info("Herder starting");

startServices();

log.info("Herder started");

while (!stopping.get()) {
tick();
}

halt();

log.info("Herder stopped");
herderMetrics.close();
} catch (Throwable t) {
log.error("Uncaught exception in herder work thread, exiting: ", t);
Exit.exit(1);
}
}

在startServices中会依次启动worker,status store,config store, 在worker的start中会启动offset store

1
2
3
4
5
protected void startServices() {
this.worker.start();
this.statusBackingStore.start();
this.configBackingStore.start();
}

在RestServer的initializeResources方法中会通过调用ResourceConfig的register方法将Rest请求路径”/“,”connector”及”connector-plugins”与herder绑定起来。

1
2
3
resourceConfig.register(new RootResource(herder));
resourceConfig.register(new ConnectorsResource(herder, config));
resourceConfig.register(new ConnectorPluginsResource(herder));

然后对RestServer日志、跨域等根据配置做一些特定设置,至此Kafka Connect started.

配置SpringBoot项目

  1. 修改pom文件中添加cas client依赖
    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.jasig.cas.client</groupId>
    <artifactId>cas-client-support-springboot</artifactId>
    <version>3.6.2</version>
    </dependency>
  2. 修改application.properties添加如下内容
    1
    2
    3
    4
    5
    6
    7
    8
    #CAS Server URL
    cas.server-url-prefix=http://localhost:8443/cas
    #CAS Server认证登录URL
    cas.server-login-url=http://localhost:8443/cas/login
    #受保护程序的URL
    cas.client-host-url=http://localhost:8080
    #认证协议
    cas.validation-type=CAS3
  3. 在Springboot启动类中加上注解@EnableCasClient
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @EnableCasClient
    @SpringBootApplication
    public class CasDemoApplication {

    public static void main(String[] args) {
    SpringApplication.run(CasDemoApplication.class, args);
    }

    }
  4. 添加测试Controller
    1
    2
    3
    4
    5
    6
    7
    8
    @RestController
    public class HelloController {

    @GetMapping("/hello")
    public String hello(){
    return "Hello CAS!";
    }
    }
  5. 启动程序并访问http://localhost:8080/hello, 发现浏览器可以正常跳转至cas登录页面, 但是会提示”Application Not Authorized to Use CAS”. 因为服务器默认启用的是https和imaps服务, 所以需要回到CAS 学习(一)–CAS Server 安装修改服务端.

配置服务端

  1. 修改application.xml内容为

    1
    2
    3
    4
    5
    6
    7
    8
    server:
    ssl:
    enabled: false

    cas:
    service-registry:
    core:
    init-from-json: true
  2. 需要在src/main/resources目录下创建services目录,新建HTTPSandIMAPS-10000001.json文件,内容如下

    1
    2
    3
    4
    5
    6
    7
    8
    {
    "@class" : "org.apereo.cas.services.RegexRegisteredService",
    "serviceId" : "^(https|http|imaps)://.*",
    "name" : "HTTPS and IMAPS",
    "id" : 10000001,
    "description" : "This service definition authorizes all application urls that support HTTPS and IMAPS protocols.",
    "evaluationOrder" : 10000
    }

    重启CAS Server并重新访问http://localhost:8080/hello, 发现任然提示”Application Not Authorized to Use CAS”错误信息. 此时需要调试程序查找原因

  3. 下载CAS Server代码进行远程调试, 端口号为5005, 发现CasServiceRegistryInitializationConfiguration.java中getServiceRegistryInitializerServices方法在验证资源文件时使用的ResourceUtils.doesResourceExist存在bug, 由于registry为Json时, location为services目录, 所以res.contentLength>0恒为false, 导致用户自定义的json文件永远不会被解压出来使用.

CasServiceRegistryInitializationConfiguration.java中检查资源文件调用的是ResourceUtils.doesResourceExist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Resource getServiceRegistryInitializerServicesDirectoryResource() {
val registry = casProperties.getServiceRegistry().getJson();
if (ResourceUtils.doesResourceExist(registry.getLocation())) {
LOGGER.debug("Using JSON service registry location [{}] for embedded service definitions", registry.getLocation());
return registry.getLocation();
}
val parent = new File(FileUtils.getTempDirectory(), "cas");
if (!parent.mkdirs()) {
LOGGER.warn("Unable to create folder [{}]", parent);
}
val resources = ResourcePatternUtils.getResourcePatternResolver(applicationContext)
.getResources("classpath*:/services/*.json");
Arrays.stream(resources)
.forEach(resource -> ResourceUtils.exportClasspathResourceToFile(parent, resource));
LOGGER.debug("Using service registry location [{}] for embedded service definitions", parent);
return new FileSystemResource(parent);
}

ResourceUtils.java中res.contentLength() > 0恒为false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static boolean doesResourceExist(final Resource res) {
if (res == null) {
return false;
}
try {
IOUtils.read(res.getInputStream(), new byte[1]);
return res.contentLength() > 0;
} catch (final FileNotFoundException e) {
LOGGER.trace(e.getMessage());
return false;
} catch (final Exception e) {
LOGGER.trace(e.getMessage(), e);
return false;
}
}
  1. 修复bug后重新启动CAS Server并访问http://localhost:8080/hello,久违的登录页面终于出现了,输入用户名密码后能够成功跳转并输出Hello CAS!