Skip to content

Commit

Permalink
Aggregate OpenAPI spec
Browse files Browse the repository at this point in the history
  • Loading branch information
mbohlool committed Aug 1, 2017
1 parent 400b77b commit 8c0580d
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 8 deletions.
1 change: 0 additions & 1 deletion cmd/kube-apiserver/app/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command

// the aggregator doesn't wire these up. It just delegates them to the kubeapiserver
genericConfig.EnableSwaggerUI = false
genericConfig.OpenAPIConfig = nil
genericConfig.SwaggerConfig = nil

// copy the etcd options so we don't mutate originals.
Expand Down
31 changes: 27 additions & 4 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ type APIAggregator struct {

// Information needed to determine routing for the aggregator
serviceResolver ServiceResolver

openAPIAggregator *openAPIAggregator
}

type completedConfig struct {
Expand All @@ -142,6 +144,11 @@ func (c *Config) SkipComplete() completedConfig {

// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
// Prevent generic API server to install OpenAPI handler. Aggregator server
// has its own customized OpenAPI handler.
openApiConfig := c.Config.GenericConfig.OpenAPIConfig
c.Config.GenericConfig.OpenAPIConfig = nil

genericServer, err := c.Config.GenericConfig.SkipComplete().New("kube-aggregator", delegationTarget) // completion is done in Complete, no need for a second time
if err != nil {
return nil, err
Expand Down Expand Up @@ -212,17 +219,29 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil
})

if openApiConfig != nil {
s.openAPIAggregator, err = buildAndRegisterOpenAPIAggregator(
s.delegateHandler,
s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),
openApiConfig,
s.GenericAPIServer.Handler.NonGoRestfulMux,
s.contextMapper)
if err != nil {
return nil, err
}
}

return s, nil
}

// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.
// It's a slow moving API, so its ok to run the controller on a single thread
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) error {
// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
// since they are wired against listers because they require multiple resources to respond
if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
proxyHandler.updateAPIService(apiService)
return
return s.openAPIAggregator.loadApiServiceSpec(proxyHandler, apiService)
}

proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
Expand All @@ -241,18 +260,21 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
serviceResolver: s.serviceResolver,
}
proxyHandler.updateAPIService(apiService)
if err := s.openAPIAggregator.loadApiServiceSpec(proxyHandler, apiService); err != nil {
return err
}
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)

// if we're dealing with the legacy group, we're done here
if apiService.Name == legacyAPIServiceName {
return
return nil
}

// if we've already registered the path with the handler, we don't want to do it again.
if s.handledGroups.Has(apiService.Spec.Group) {
return
return nil
}

// it's time to register the group aggregation endpoint
Expand All @@ -268,6 +290,7 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
s.handledGroups.Insert(apiService.Spec.Group)
return nil
}

// RemoveAPIService removes the APIService from being handled. It is not thread-safe, so only call it on one thread at a time please.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
)

type APIHandlerManager interface {
AddAPIService(apiService *apiregistration.APIService)
AddAPIService(apiService *apiregistration.APIService) error
RemoveAPIService(apiServiceName string)
}

Expand Down Expand Up @@ -102,8 +102,7 @@ func (c *APIServiceRegistrationController) sync(key string) error {
return nil
}

c.apiHandlerManager.AddAPIService(apiService)
return nil
return c.apiHandlerManager.AddAPIService(apiService)
}

func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {
Expand Down
270 changes: 270 additions & 0 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/openapi_aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package apiserver

import (
"encoding/json"
"fmt"
"net/http"
"sort"
"time"

"github.com/emicklei/go-restful"
"github.com/go-openapi/spec"

"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-openapi/pkg/aggregator"
"k8s.io/kube-openapi/pkg/builder"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler"
)

const (
aggregatorUser = "system:aggregator"
specDownloadTimeout = 60 * time.Second
)

type openAPIAggregator struct {
// Map of API Services' OpenAPI specs by their name
openAPISpecs map[string]*openAPISpecInfo

// provided for dynamic OpenAPI spec
openAPIService *handler.OpenAPIService

// Aggregator's OpenAPI spec (holds apiregistration group).
aggregatorOpenAPISpec *spec.Swagger

// Local (in process) delegate's OpenAPI spec.
inProcessDelegatesOpenAPISpec *spec.Swagger

contextMapper request.RequestContextMapper
}

func buildAndRegisterOpenAPIAggregator(delegateHandler http.Handler, webServices []*restful.WebService, config *common.Config, pathHandler common.PathHandler, contextMapper request.RequestContextMapper) (s *openAPIAggregator, err error) {
s = &openAPIAggregator{
openAPISpecs: map[string]*openAPISpecInfo{},
contextMapper: contextMapper,
}

// Get Local delegate's Spec
s.inProcessDelegatesOpenAPISpec, err = s.downloadOpenAPISpec(delegateHandler)
if err != nil {
return nil, err
}

// Build Aggregator's spec
s.aggregatorOpenAPISpec, err = builder.BuildOpenAPISpec(
webServices, config)
if err != nil {
return nil, err
}
// Remove any non-API endpoints from aggregator's spec. aggregatorOpenAPISpec
// is the source of truth for all non-api endpoints.
aggregator.FilterSpecByPaths(s.aggregatorOpenAPISpec, []string{"/apis/"})

// Build initial spec to serve.
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return nil, err
}

// Install handler
s.openAPIService, err = handler.RegisterOpenAPIService(
specToServe, "/swagger.json", pathHandler)
if err != nil {
return nil, err
}

return s, nil
}

// openAPISpecInfo is used to store OpenAPI spec with its priority.
// It can be used to sort specs with their priorities.
type openAPISpecInfo struct {
apiService apiregistration.APIService
spec *spec.Swagger
}

// byPriority can be used in sort.Sort to sort specs with their priorities.
type byPriority struct {
specs []openAPISpecInfo
groupPriorities map[string]int32
}

func (a byPriority) Len() int { return len(a.specs) }
func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] }
func (a byPriority) Less(i, j int) bool {
var iPriority, jPriority int32
if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group {
iPriority = a.specs[i].apiService.Spec.VersionPriority
jPriority = a.specs[i].apiService.Spec.VersionPriority
} else {
iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group]
jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group]
}
if iPriority != jPriority {
// Sort by priority, higher first
return iPriority > jPriority
}
// Sort by service name.
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
}

func sortByPriority(specs []openAPISpecInfo) {
b := byPriority{
specs: specs,
groupPriorities: map[string]int32{},
}
for _, spec := range specs {
if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr {
b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum
}
}
sort.Sort(b)
}

// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe.
func (s *openAPIAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) {
specToReturn, err = aggregator.CloneSpec(s.inProcessDelegatesOpenAPISpec)
if err != nil {
return nil, err
}
if err := aggregator.MergeSpecs(specToReturn, s.aggregatorOpenAPISpec); err != nil {
return nil, fmt.Errorf("cannot merge local delegate spec with aggregator spec: %s", err.Error())
}
specs := []openAPISpecInfo{}
for _, specInfo := range s.openAPISpecs {
specs = append(specs, openAPISpecInfo{specInfo.apiService, specInfo.spec})
}
sortByPriority(specs)
for _, specInfo := range specs {
if err := aggregator.MergeSpecs(specToReturn, specInfo.spec); err != nil {
return nil, err
}
}
return specToReturn, nil
}

// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe.
func (s *openAPIAggregator) updateOpenAPISpec() error {
if s.openAPIService == nil {
return nil
}
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return err
}
return s.openAPIService.UpdateSpec(specToServe)
}

// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
header http.Header
respCode int
data []byte
}

func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}

func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}

func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.respCode = code
}

func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
r.data = append(r.data, in...)
return len(in), nil
}

func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}

func (s *openAPIAggregator) handlerWithUser(handler http.Handler, info user.Info) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if ctx, ok := s.contextMapper.Get(req); ok {
s.contextMapper.Update(req, request.WithUser(ctx, info))
}
handler.ServeHTTP(w, req)
})
}

// downloadOpenAPISpec downloads openAPI spec from /swagger.json endpoint of the given handler.
func (s *openAPIAggregator) downloadOpenAPISpec(handler http.Handler) (*spec.Swagger, error) {
handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
handler = request.WithRequestContext(handler, s.contextMapper)
handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")

req, err := http.NewRequest("GET", "/swagger.json", nil)
if err != nil {
return nil, err
}
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)

switch writer.respCode {
case http.StatusOK:
openApiSpec := &spec.Swagger{}
if err := json.Unmarshal(writer.data, openApiSpec); err != nil {
return nil, err
}
return openApiSpec, nil
default:
return nil, fmt.Errorf("failed to retrive openAPI spec, http error: %s", writer.String())
}
}

// loadApiServiceSpec loads OpenAPI spec for the given API Service and then updates aggregator's spec.
func (s *openAPIAggregator) loadApiServiceSpec(handler http.Handler, apiService *apiregistration.APIService) error {

// Ignore local services
if apiService.Spec.Service == nil {
return nil
}

openApiSpec, err := s.downloadOpenAPISpec(handler)
if err != nil {
return err
}
aggregator.FilterSpecByPaths(openApiSpec, []string{"/apis/" + apiService.Spec.Group + "/"})

s.openAPISpecs[apiService.Name] = &openAPISpecInfo{
apiService: *apiService,
spec: openApiSpec,
}

err = s.updateOpenAPISpec()
if err != nil {
delete(s.openAPISpecs, apiService.Name)
return err
}
return nil
}
Loading

0 comments on commit 8c0580d

Please sign in to comment.