亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

淺析k8s的portforward

2023-06-30 06:03:23
95
0

最(zui)近(jin)開(kai)發中,本地服務(wu)(wu)需要(yao)grpc調用(yong)開(kai)發環(huan)境的云端(duan)(duan)服務(wu)(wu)進行調試,云端(duan)(duan)服務(wu)(wu)并未開(kai)放外部端(duan)(duan)口,因(yin)此想到(dao)了 k8s 的 portforward 功能(neng)。此前(qian)也(ye)只是在(zai)書本或(huo)文章(zhang)中獲知有此功能(neng),未深入(ru)了解(jie)和(he)使用(yong)。趁著(zhu)這次機會,對 portforward 進行了一番研究,與大家分(fen)析一下自己的歷(li)程。

portforward 的調用流程(cheng)大概如下:

首(shou)先會在本地通過kubectl開啟一(yi)個使用spdy協(xie)議的(de)(de) reset server,把本地的(de)(de)請求(qiu)發往k8s對(dui)應的(de)(de)pod的(de)(de)子資源 portforward。

func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
	transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
	// 此處省略無關代碼
        // ...
	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
	fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
	// 此處省略無關代碼
        // ...
	return fw.ForwardPorts()
}
// 此處省略無關代碼
// ...
// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
	pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
	// 此處省略無關代碼
        // ...
	req := o.RESTClient.Post().
		Resource("pods").
		Namespace(o.Namespace).
		Name(pod.Name).
		SubResource("portforward")
	return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
}

注(zhu):kubectl 源(yuan)碼片斷(duan)(kubectl/pkg/cmd/portforward/portforward.go)

請求到了k8s api server 端(duan),

// Connect returns a handler for the pod portforward proxy
func (r *PortForwardREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
	portForwardOpts, ok := opts.(*api.PodPortForwardOptions)
	//...
	location, transport, err := pod.PortForwardLocation(ctx, r.Store, r.KubeletConn, name, portForwardOpts)
	//...
	return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}

func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
	handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
	handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
	return handler
}

注:k8s 源碼片斷(duan)(kubectl/pkg/cmd/portforward/portforward.go)

在kubelet server中監聽/portForward,找到對應的pod,將請求(qiu)代理給它。

// getPortForward handles a new restful port forward request. It determines the
// pod name and uid and then calls ServePortForward.
func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
	params := getPortForwardRequestParams(request)
	portForwardOptions, err := portforward.NewV4Options(request.Request)
	//...
	pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
	//...
	url, err := s.host.GetPortForward(request.Request.Context(), pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
	//...
	proxyStream(response.ResponseWriter, request.Request, url)
}

注:k8s 源(yuan)碼(ma)片(pian)斷(kubernetes/pkg/kubelet/server/server.go)

同時(shi),容器運行時(shi)cri要實(shi)現PortForward接口,到此(ci),就將本地(di)的請求轉發到了目標(biao)pod了。

// PortForwarder knows how to forward content from a data stream to/from a port in a pod.
type PortForwarder interface {
	// PortForwarder copies data between a data stream and a port in a pod.
	PortForward(ctx context.Context, name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
}

// ServePortForward handles a port forwarding request.  A single request is
// kept alive as long as the client is still alive and the connection has not
// been timed out due to idleness. This function handles multiple forwarded
// connections; i.e., multiple `curl //localhost:8888/` requests will be
// handled by a single invocation of ServePortForward.
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
	var err error
	if wsstream.IsWebSocketRequest(req) {
		err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
	} else {
		err = handleHTTPStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
	}

	if err != nil {
		runtime.HandleError(err)
		return
	}
}

注:k8s 源碼片(pian)斷(kubernetes/pkg/kubelet/cri/streaming/portforward/portforward.go)

此至,整(zheng)個portforward的流程代碼在心里有(you)了(le)一個清晰的調用鏈流程了(le)。

0條評論
0 / 1000
杜****中
5文章(zhang)數
0粉絲(si)數
杜****中
5 文章 | 0 粉絲
杜****中
5文章數(shu)
0粉絲數
杜****中
5 文章 | 0 粉(fen)絲
原(yuan)創(chuang)

淺析k8s的portforward

2023-06-30 06:03:23
95
0

最近開發中,本地服務需要grpc調用開發環境的(de)(de)(de)云(yun)端服務進(jin)(jin)行(xing)調試,云(yun)端服務并未(wei)開放(fang)外(wai)部端口,因此想到(dao)了(le) k8s 的(de)(de)(de) portforward 功(gong)能。此前也只是在書本或文(wen)章中獲知有(you)此功(gong)能,未(wei)深入了(le)解和使用。趁(chen)著這次(ci)機會,對 portforward 進(jin)(jin)行(xing)了(le)一番研究,與大家分析一下(xia)自己的(de)(de)(de)歷程。

portforward 的調用流(liu)程(cheng)大概(gai)如下(xia):

首先會在本地(di)通過(guo)kubectl開啟一個使(shi)用spdy協(xie)議的 reset server,把本地(di)的請(qing)求發(fa)往k8s對應的pod的子資(zi)源(yuan) portforward。

func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
	transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
	// 此處省略無關代碼
        // ...
	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
	fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
	// 此處省略無關代碼
        // ...
	return fw.ForwardPorts()
}
// 此處省略無關代碼
// ...
// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
	pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
	// 此處省略無關代碼
        // ...
	req := o.RESTClient.Post().
		Resource("pods").
		Namespace(o.Namespace).
		Name(pod.Name).
		SubResource("portforward")
	return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
}

注:kubectl 源碼片(pian)斷(kubectl/pkg/cmd/portforward/portforward.go)

請求到了k8s api server 端,

// Connect returns a handler for the pod portforward proxy
func (r *PortForwardREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
	portForwardOpts, ok := opts.(*api.PodPortForwardOptions)
	//...
	location, transport, err := pod.PortForwardLocation(ctx, r.Store, r.KubeletConn, name, portForwardOpts)
	//...
	return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}

func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
	handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
	handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
	return handler
}

注(zhu):k8s 源碼(ma)片斷(kubectl/pkg/cmd/portforward/portforward.go)

在kubelet server中監聽/portForward,找到(dao)對應的pod,將請求代理給它。

// getPortForward handles a new restful port forward request. It determines the
// pod name and uid and then calls ServePortForward.
func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
	params := getPortForwardRequestParams(request)
	portForwardOptions, err := portforward.NewV4Options(request.Request)
	//...
	pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
	//...
	url, err := s.host.GetPortForward(request.Request.Context(), pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
	//...
	proxyStream(response.ResponseWriter, request.Request, url)
}

注(zhu):k8s 源(yuan)碼片斷(kubernetes/pkg/kubelet/server/server.go)

同時,容器運行時cri要實現(xian)PortForward接口,到(dao)(dao)此(ci),就將本(ben)地的請求轉發到(dao)(dao)了目標pod了。

// PortForwarder knows how to forward content from a data stream to/from a port in a pod.
type PortForwarder interface {
	// PortForwarder copies data between a data stream and a port in a pod.
	PortForward(ctx context.Context, name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
}

// ServePortForward handles a port forwarding request.  A single request is
// kept alive as long as the client is still alive and the connection has not
// been timed out due to idleness. This function handles multiple forwarded
// connections; i.e., multiple `curl //localhost:8888/` requests will be
// handled by a single invocation of ServePortForward.
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
	var err error
	if wsstream.IsWebSocketRequest(req) {
		err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
	} else {
		err = handleHTTPStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
	}

	if err != nil {
		runtime.HandleError(err)
		return
	}
}

注:k8s 源(yuan)碼片斷(duan)(kubernetes/pkg/kubelet/cri/streaming/portforward/portforward.go)

此(ci)至(zhi),整(zheng)個portforward的流程代碼在心里有了一個清晰的調用鏈流程了。

文章來自個人專欄
文章 | 訂(ding)閱
0條評論
0 / 1000
請輸入你的評論
0
0