1 介绍
Tip
- 集群的HTTP REST API接口,是集群控制的入口,包括认证授权、数据校验以及集群状态变更 等
- 将k8s “资源组/资源版本/资源” 以RESTful风格的形式对外暴露并提供服务
- k8s集群中的所有组件都通过kube-apiserver组件操作资源对象,也通过它让各组件可以通信和交互
- 只有API Server才直接操作etcd
2 go-restful
概念介绍
- Container 相当于 一个http server,不同的container监控不同的地址和端口
- 每个container可以包含多个WebService,相当于一组不同服务的分类 ,比如Pod作为一组,configmap作为一组
- 每个WebService包含多个Router(路由),Router根据http请求的URL路由到对应的处理函数(Handler)
package main
import (
// v3 支持go mod
"io"
"log"
"net/http"
restful "github.com/emicklei/go-restful/v3"
)
func main() {
ws := new(restful.WebService)
ws.Route(ws.GET("/hello").To(hello))
// 这里有一个默认的Container,进行添加WebService的操作
restful.Add(ws)
go func() {
log.Fatal(http.ListenAndServe(":8080", nil))
}()
container2 := restful.NewContainer()
ws2 := new(restful.WebService)
ws2.Route(ws2.GET("/ping").To(pong))
container2.Add(ws2)
server := &http.Server{Addr: ":8081", Handler: container2}
log.Fatal(server.ListenAndServe())
}
func hello(req *restful.Request, resp *restful.Response) {
io.WriteString(resp, "world")
}
func pong(req *restful.Request, resp *restful.Response) {
io.WriteString(resp, "pong")
}3 访问控制
API Server 权限管理流程
3.1 认证 Authentication
Tip
针对请求的认证,确认是否具有访问Kubernetes集群的权限
vendor/k8s.io/apiserver/pkg/endpoints/filters/authentication.go
func withAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler, apiAuds authenticator.Audiences, metrics recordMetrics) http.Handler {
if auth == nil {
klog.Warning("Authentication is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
authenticationStart := time.Now()
if len(apiAuds) > 0 {
req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds))
}
resp, ok, err := auth.AuthenticateRequest(req)
authenticationFinish := time.Now()
defer func() {
metrics(req.Context(), resp, ok, err, apiAuds, authenticationStart, authenticationFinish)
}()
if err != nil || !ok {
if err != nil {
klog.ErrorS(err, "Unable to authenticate the request")
}
failed.ServeHTTP(w, req)
return
}
if !audiencesAreAcceptable(apiAuds, resp.Audiences) {
err = fmt.Errorf("unable to match the audience: %v , accepted: %v", resp.Audiences, apiAuds)
klog.Error(err)
failed.ServeHTTP(w, req)
return
}
// authorization header is not required anymore in case of a successful authentication.
req.Header.Del("Authorization")
req = req.WithContext(genericapirequest.WithUser(req.Context(), resp.User))
handler.ServeHTTP(w, req)
})
}staging/src/k8s.io/apiserver/pkg/authentication/request/union/union.go
func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
var errlist []error
for _, currAuthRequestHandler := range authHandler.Handlers {
resp, ok, err := currAuthRequestHandler.AuthenticateRequest(req)
if err != nil {
if authHandler.FailOnError {
return resp, ok, err
}
errlist = append(errlist, err)
continue
}
if ok {
// 有很多认证方式, 只要一个ok就行
return resp, ok, err
}
}
return nil, false, utilerrors.NewAggregate(errlist)
}3.1.1 自定义Webhook
Tip
当客户端发送的认证请求到达kube-apiserver 时,kube-apiserver回调设置的webhook方法,将验证信息发送给远程的Webhook服务器进行认证,然后根据Webhook服务器返回的状态码来判断是否认证成功
源码
staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook.go
func (w *WebhookTokenAuthenticator) AuthenticateToken(ctx context.Context, token string) (*authenticator.Response, bool, error) {
r := &authenticationv1.TokenReview{
Spec: authenticationv1.TokenReviewSpec{
Token: token,
Audiences: wantAuds,
},
}
result, statusCode, tokenReviewErr = w.tokenReview.Create(ctx, r, metav1.CreateOptions{})
r.Status = result.Status
if !r.Status.Authenticated {
var err error
if len(r.Status.Error) != 0 {
err = errors.New(r.Status.Error)
}
return nil, false, err
}
var extra map[string][]string
if r.Status.User.Extra != nil {
extra = map[string][]string{}
for k, v := range r.Status.User.Extra {
extra[k] = v
}
}
return &authenticator.Response{
User: &user.DefaultInfo{
Name: r.Status.User.Username,
UID: r.Status.User.UID,
Groups: r.Status.User.Groups,
Extra: extra,
},
Audiences: auds,
}, true, nil
}staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook.go
func (c *tokenReviewV1Client) Create(ctx context.Context, tokenReview *authenticationv1.TokenReview, opts metav1.CreateOptions) (result *authenticationv1.TokenReview, statusCode int, err error) {
result = &authenticationv1.TokenReview{}
restResult := c.client.Post().
Resource("tokenreviews").
VersionedParams(&opts, scheme.ParameterCodec).
// body 是 authenticationv1.TokenReview 结构
Body(tokenReview).
Do(ctx)
restResult.StatusCode(&statusCode)
err = restResult.Into(result)
return
}- kubeconfig 配置文件添加用户
~/.kube/config
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: ...
server: https://192.168.66.100:6443
name: kubernetes
contexts:
- context:
cluster: kubernetes
user: kubernetes-admin
name: kubernetes-admin@kubernetes
current-context: kubernetes-admin@kubernetes
kind: Config
preferences: {}
users:
- name: kubernetes-admin
user:
client-certificate-data: ...
client-key-data: ...
## 添加一个用户
- name: xyz
user:
token: abck get po
# 默认是用这个用户去访问集群的. 用这个用户去认证
k get po --user=kubernetes-admin
# 现在我们用我们创建的用户去访问.
# 发现无法识别你的用户时,就会去看你是否配置了webhook,有就将这个转发给它认证
# 这里会提示你 You must be logged in to the server (Unauthorized)
k get po --user=xyz- 创建一个webhook配置文件1
# 目录随意
mkdir -p /etc/kubernetes/abc
cd /etc/kubernetes/abc
# 1. 将集群详细信息添加到配置文件中
# --server= 指定我们将来apiserver 转发去认证的服务地址
kubectl config --kubeconfig=webhook.yaml set-cluster my-test --server=http://192.168.1.100:3000/authenticate
# 2. 将用户详细信息添加到配置文件中
# 创建一个用户 xyz , 这里token 我们随便写一个
kubectl config --kubeconfig=webhook.yaml set-credentials xyz --token=abc
# 3. 将上下文详细信息添加到配置文件中
# 添加一个上下文, 将集群和用户关联起来 --namespace=default
kubectl config --kubeconfig=webhook.yaml set-context my-hook --cluster=my-test --user=xyz
# 4. 设置当前上下文
kubectl config --kubeconfig=webhook.yaml use-context my-hook
# 会在当前目录生成一个 webhook.yaml文件webhook.yaml
- 创建服务,用来认证
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
authentication "k8s.io/api/authentication/v1beta1"
)
func main() {
http.HandleFunc("/authenticate", auth)
log.Println(http.ListenAndServe(":3000", nil))
}
func auth(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var tr authentication.TokenReview
err := decoder.Decode(&tr)
if err != nil {
log.Println("[Error]", err.Error())
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]interface{}{
"apiVersion": "authentication.k8s.io/v1beta1",
"kind": "TokenReview",
"status": authentication.TokenReviewStatus{
Authenticated: false,
},
})
return
}
fmt.Println("tr:::", tr)
log.Print("receving request")
// 这里 随便 验证. (这里写的很粗糙,就为了测试,不用管)
if tr.Spec.Token == "abc" {
fmt.Println(888)
w.WriteHeader(http.StatusOK)
trs := authentication.TokenReviewStatus{
Authenticated: true,
User: authentication.UserInfo{
Username: "xyz",
UID: "xyz",
},
}
json.NewEncoder(w).Encode(map[string]interface{}{
"apiVersion": "authentication.k8s.io/v1beta1",
"kind": "TokenReview",
"status": trs,
})
return
}
w.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(w).Encode(map[string]interface{}{
"apiVersion": "authentication.k8s.io/v1beta1",
"kind": "TokenReview",
"status": authentication.TokenReviewStatus{
Authenticated: false,
},
})
return
}- 修改kube-apiserver pod的配置
/etc/kubernetes/manifests/kube-apiserver.yaml
...
spec:
containers:
- command:
- kube-apiserver
- ...
- --authentication-token-webhook-config-file=/etc/config/apiserver/webhook.yaml
- --authentication-token-webhook-cache-ttl=2m #用来设定身份认证决定的缓存时间,默认时长就是2分钟
volumeMounts:
- ...
- mountPath: /etc/config/apiserver
name: webhook
readOnly: true
volumes:
- ...
- hostPath:
path: /etc/kubernetes/abc
type: DirectoryOrCreate
name: webhook等待apiserver 自动重启 或这 手动
systemctl restart kubelet使用用户
xyz来访问
会报错,提示Error from server (Forbidden): pods is forbidden: User “xyz” cannot list resource “pods” in API group “” in the namespace “default”
这说明是ok了, 已经过了认证, 到了鉴权阶段了,提示没有权限
3.3 准入 Admission
Tip
对象被持久化之前,拦截kube-apiserver的请求,拦截后的请求进入准入控制器中处理,对请求的资源对象进行自定义
前面提到的创建的pod默认会设置一个投射卷, 就是准入控制器做的.
准入控制支持同时开启多个插件,它们依次调用,只有全部插件都通过的请求才可以放过进入系统
3.3.1 准入 Webhook
Tip
- 除默认的准入控制插件以外, k8s预留了准入控制插件的扩展点,用户可自定义准入控制 插件实现自定义准入功能
- 准入 Webhook 是一种用于接收准入请求并对其进行处理的 HTTP 回调机制。
- 可以定义两种类型的准入 webhook,即 验证性质的准入 Webhook(ValidatingWebhookConfiguration) 和 修改性质的准入 Webhook (MutatingWebhookConfiguration)。
- 修改性质的准入 Webhook 会先被调用。它们可以更改发送到 API 服务器的对象以执行自定义的设置默认值操作
我们通过创建一个MutatingWebhookConfiguration,里面指定一个service服务,这个service就是webhook 服务,我这个service 是指向外部的应用,非pod
# 直接生成 根私钥 和 根证书
# -nodes 表示 no des 不加密的意思
# openssl 的命令真的... , 建议 用逻辑上 一步一步来, 先私钥,再生成自签名 这样来.
# 这里 我也记录一下, 以免看到这样的东西 觉得陌生.
openssl req -nodes -x509 -new -keyout ca.key -subj "/CN=x.com" -days 5000 -out ca.crt
openssl genrsa -out server.key 2048
cat >server.conf<<EOF
[req]
req_extensions = v3_req
distinguished_name = req_distinguished_name
# 表示 生成csr时,将直接从配置文件中读取关于申请者字段的信息,不会提示输入
# req_distinguished_name 里的字段
prompt = no
[req_distinguished_name]
# CN是 必填的, 其他也可以写, 什么国家 部门..
CN = svc-mutate.default.svc
[ v3_req ]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth, serverAuth
subjectAltName = @alt_names
[alt_names]
DNS.1 = svc-mutate.default.svc # svc
IP.1 = 10.98.140.200 # svc的cluster ip
EOF
# -subj "/CN=svc-mutate.default.svc" 可以不用写
openssl req -new -key server.key -out server.csr -config server.conf
openssl x509 -req -CA ca.crt -CAkey ca.key -CAcreateserial -in server.csr -out server.crt -extensions v3_req -extfile server.conf- 目录结构
# go 代码
tree
├── go.mod
├── main.go
└── pki
├── server.crt # 前面生成的
├── server.csr
└── server.key # 前面生成的- go代码
main.go 上面那个试过效果再来用这个实际示例,参考官方
package main
import (
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
v1 "k8s.io/api/admission/v1"
"k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/klog/v2"
)
type Config struct {
CertFile string
KeyFile string
}
func configTLS(config Config) *tls.Config {
sCert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile)
if err != nil {
klog.Fatal(err)
}
return &tls.Config{
Certificates: []tls.Certificate{sCert},
// ClientAuth: tls.RequireAndVerifyClientCert,
}
}
func main() {
config := Config{
CertFile: "./pki/server.crt",
KeyFile: "./pki/server.key",
}
http.HandleFunc("/add-label", serveAddLabel)
server := &http.Server{
Addr: fmt.Sprintf(":%d", 443),
TLSConfig: configTLS(config),
}
log.Println(server.ListenAndServeTLS("", ""))
}
func serveAddLabel(w http.ResponseWriter, r *http.Request) {
admit := admitHandler{
v1beta1: delegateV1beta1AdmitToV1(addLabel),
v1: addLabel,
}
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}
// 打印请求的数据
fmt.Println("handling request:", string(body))
// return
// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
klog.Errorf("contentType=%s, expect application/json", contentType)
return
}
var admissionReviewReq v1beta1.AdmissionReview
deserializer := codecs.UniversalDeserializer()
// 需要 指定 admissionReviewReq , 否则报错, 官方例子 这里是 body, nil, nil,会报错
obj, gvk, err := deserializer.Decode(body, nil, &admissionReviewReq)
if err != nil {
msg := fmt.Sprintf("Request could not be decoded: %v", err)
klog.Error(msg)
http.Error(w, msg, http.StatusBadRequest)
return
}
var responseObj runtime.Object
switch *gvk {
case v1beta1.SchemeGroupVersion.WithKind("AdmissionReview"):
requestedAdmissionReview, ok := obj.(*v1beta1.AdmissionReview)
if !ok {
klog.Errorf("Expected v1beta1.AdmissionReview but got: %T", obj)
return
}
responseAdmissionReview := &v1beta1.AdmissionReview{}
responseAdmissionReview.SetGroupVersionKind(*gvk)
responseAdmissionReview.Response = admit.v1beta1(*requestedAdmissionReview)
responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID
responseObj = responseAdmissionReview
case v1.SchemeGroupVersion.WithKind("AdmissionReview"):
requestedAdmissionReview, ok := obj.(*v1.AdmissionReview)
if !ok {
klog.Errorf("Expected v1.AdmissionReview but got: %T", obj)
return
}
responseAdmissionReview := &v1.AdmissionReview{}
responseAdmissionReview.SetGroupVersionKind(*gvk)
responseAdmissionReview.Response = admit.v1(*requestedAdmissionReview)
responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID
responseObj = responseAdmissionReview
default:
msg := fmt.Sprintf("Unsupported group version kind: %v", gvk)
klog.Error(msg)
http.Error(w, msg, http.StatusBadRequest)
return
}
klog.V(2).Info(fmt.Sprintf("sending response: %v", responseObj))
respBytes, err := json.Marshal(responseObj)
if err != nil {
klog.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respBytes); err != nil {
klog.Error(err)
}
}
const (
addFirstLabelPatch string = `[
{ "op": "add", "path": "/metadata/labels", "value": {"added-label": "yes"}}
]`
addAdditionalLabelPatch string = `[
{ "op": "add", "path": "/metadata/labels/added-label", "value": "yes" }
]`
updateLabelPatch string = `[
{ "op": "replace", "path": "/metadata/labels/added-label", "value": "yes" }
]`
)
func addLabel(ar v1.AdmissionReview) *v1.AdmissionResponse {
klog.V(2).Info("calling add-label")
obj := struct {
metav1.ObjectMeta `json:"metadata,omitempty"`
}{}
raw := ar.Request.Object.Raw
err := json.Unmarshal(raw, &obj)
if err != nil {
klog.Error(err)
return toV1AdmissionResponse(err)
}
reviewResponse := v1.AdmissionResponse{}
reviewResponse.Allowed = true
pt := v1.PatchTypeJSONPatch
labelValue, hasLabel := obj.ObjectMeta.Labels["added-label"]
switch {
case len(obj.ObjectMeta.Labels) == 0:
reviewResponse.Patch = []byte(addFirstLabelPatch)
reviewResponse.PatchType = &pt
case !hasLabel:
reviewResponse.Patch = []byte(addAdditionalLabelPatch)
reviewResponse.PatchType = &pt
case labelValue != "yes":
reviewResponse.Patch = []byte(updateLabelPatch)
reviewResponse.PatchType = &pt
default:
// already set
}
return &reviewResponse
}
type admitv1beta1Func func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse
type admitv1Func func(v1.AdmissionReview) *v1.AdmissionResponse
var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)
func delegateV1beta1AdmitToV1(f admitv1Func) admitv1beta1Func {
return func(review v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
in := v1.AdmissionReview{Request: convertAdmissionRequestToV1(review.Request)}
out := f(in)
return convertAdmissionResponseToV1beta1(out)
}
}
type admitHandler struct {
v1beta1 admitv1beta1Func
v1 admitv1Func
}
func convertAdmissionRequestToV1(r *v1beta1.AdmissionRequest) *v1.AdmissionRequest {
return &v1.AdmissionRequest{
Kind: r.Kind,
Namespace: r.Namespace,
Name: r.Name,
Object: r.Object,
Resource: r.Resource,
Operation: v1.Operation(r.Operation),
UID: r.UID,
DryRun: r.DryRun,
OldObject: r.OldObject,
Options: r.Options,
RequestKind: r.RequestKind,
RequestResource: r.RequestResource,
RequestSubResource: r.RequestSubResource,
SubResource: r.SubResource,
UserInfo: r.UserInfo,
}
}
func convertAdmissionRequestToV1beta1(r *v1.AdmissionRequest) *v1beta1.AdmissionRequest {
return &v1beta1.AdmissionRequest{
Kind: r.Kind,
Namespace: r.Namespace,
Name: r.Name,
Object: r.Object,
Resource: r.Resource,
Operation: v1beta1.Operation(r.Operation),
UID: r.UID,
DryRun: r.DryRun,
OldObject: r.OldObject,
Options: r.Options,
RequestKind: r.RequestKind,
RequestResource: r.RequestResource,
RequestSubResource: r.RequestSubResource,
SubResource: r.SubResource,
UserInfo: r.UserInfo,
}
}
func convertAdmissionResponseToV1(r *v1beta1.AdmissionResponse) *v1.AdmissionResponse {
var pt *v1.PatchType
if r.PatchType != nil {
t := v1.PatchType(*r.PatchType)
pt = &t
}
return &v1.AdmissionResponse{
UID: r.UID,
Allowed: r.Allowed,
AuditAnnotations: r.AuditAnnotations,
Patch: r.Patch,
PatchType: pt,
Result: r.Result,
Warnings: r.Warnings,
}
}
func convertAdmissionResponseToV1beta1(r *v1.AdmissionResponse) *v1beta1.AdmissionResponse {
var pt *v1beta1.PatchType
if r.PatchType != nil {
t := v1beta1.PatchType(*r.PatchType)
pt = &t
}
return &v1beta1.AdmissionResponse{
UID: r.UID,
Allowed: r.Allowed,
AuditAnnotations: r.AuditAnnotations,
Patch: r.Patch,
PatchType: pt,
Result: r.Result,
Warnings: r.Warnings,
}
}
func toV1AdmissionResponse(err error) *v1.AdmissionResponse {
return &v1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}- 启动服务
- curl简单验证
关于证书
ca证书是为了安全拿到 webhook服务 的公钥,具体参考https
service 指定外部应用,我们的webhook服务在外部
apiVersion: v1
kind: Service
metadata:
labels:
app: svc-mutate
name: svc-mutate
spec:
ports:
- name: abc
port: 443
protocol: TCP
targetPort: 443
type: ClusterIP
---
apiVersion: v1
kind: Endpoints
metadata:
name: svc-mutate
namespace: default
subsets:
- addresses:
- ip: 192.168.1.104 # 这个是我们go webhook 服务的ip
ports:
- port: 443
name: abc这个MutatingWebhookConfiguration一创建, 你再create pod 就会去回调配置里的service指向的webhook 服务
mutate.yaml
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: "pod-policy.example.com" # 随便起
webhooks:
- name: my-webhook.example.com # 随便起
sideEffects: None
admissionReviewVersions: ["v1","v1beta1"]
clientConfig:
service: # 指定webhook的类型, 这里是 service
namespace: "default"
name: "svc-mutate" # service 名称
path: "/add-label" # 访问路径
caBundle: <openssl base64 -A <ca.crt 的结果> #(1)
rules:
- operations: ["CREATE"] # 所有创建的操作 都会回调这个 hook
apiGroups: ["*"]
apiVersions: ["*"]
resources: ["*"]
scope: "*"- caBundle 的值是
openssl base64 -A <ca.crt
想要和https服务打交道, 就需要根证书(或自签名证书)