最(zui)近(jin)開(kai)發中,本地服務(wu)(wu)需要(yao)grpc調用(yong)開(kai)發環(huan)境的云端(duan)(duan)服務(wu)(wu)進行調試,云端(duan)(duan)服務(wu)(wu)并未開(kai)放外部端(duan)(duan)口,因(yin)此想到(dao)了 k8s 的 portforward 功能(neng)。此前(qian)也(ye)只是在(zai)書本或(huo)文章(zhang)中獲知有此功能(neng),未深入(ru)了解(jie)和(he)使用(yong)。趁著(zhu)這次機會,對 portforward 進行了一番研究,與大家分(fen)析一下自己的歷(li)程。
portforward 的調用流程(cheng)大概如下:

首(shou)先會在本地通過kubectl開啟一(yi)個使用spdy協(xie)議的(de)(de) reset server,把本地的(de)(de)請求(qiu)發往k8s對(dui)應的(de)(de)pod的(de)(de)子資源 portforward。
func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
// 此處省略無關代碼
// ...
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
// 此處省略無關代碼
// ...
return fw.ForwardPorts()
}
// 此處省略無關代碼
// ...
// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
// 此處省略無關代碼
// ...
req := o.RESTClient.Post().
Resource("pods").
Namespace(o.Namespace).
Name(pod.Name).
SubResource("portforward")
return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
}
注(zhu):kubectl 源(yuan)碼片斷(duan)(kubectl/pkg/cmd/portforward/portforward.go)
請求到了k8s api server 端(duan),
// Connect returns a handler for the pod portforward proxy
func (r *PortForwardREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
portForwardOpts, ok := opts.(*api.PodPortForwardOptions)
//...
location, transport, err := pod.PortForwardLocation(ctx, r.Store, r.KubeletConn, name, portForwardOpts)
//...
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
return handler
}
注:k8s 源碼片斷(duan)(kubectl/pkg/cmd/portforward/portforward.go)
在kubelet server中監聽/portForward,找到對應的pod,將請求(qiu)代理給它。
// getPortForward handles a new restful port forward request. It determines the
// pod name and uid and then calls ServePortForward.
func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
params := getPortForwardRequestParams(request)
portForwardOptions, err := portforward.NewV4Options(request.Request)
//...
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
//...
url, err := s.host.GetPortForward(request.Request.Context(), pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
//...
proxyStream(response.ResponseWriter, request.Request, url)
}
注:k8s 源(yuan)碼(ma)片(pian)斷(kubernetes/pkg/kubelet/server/server.go)
同時(shi),容器運行時(shi)cri要實(shi)現PortForward接口,到此(ci),就將本地(di)的請求轉發到了目標(biao)pod了。
// PortForwarder knows how to forward content from a data stream to/from a port in a pod.
type PortForwarder interface {
// PortForwarder copies data between a data stream and a port in a pod.
PortForward(ctx context.Context, name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
}
// ServePortForward handles a port forwarding request. A single request is
// kept alive as long as the client is still alive and the connection has not
// been timed out due to idleness. This function handles multiple forwarded
// connections; i.e., multiple `curl //localhost:8888/` requests will be
// handled by a single invocation of ServePortForward.
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
var err error
if wsstream.IsWebSocketRequest(req) {
err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
} else {
err = handleHTTPStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
}
if err != nil {
runtime.HandleError(err)
return
}
}
注:k8s 源碼片(pian)斷(kubernetes/pkg/kubelet/cri/streaming/portforward/portforward.go)
此至,整(zheng)個portforward的流程代碼在心里有(you)了(le)一個清晰的調用鏈流程了(le)。