Skip to content

Commit

Permalink
Add jsonschema as a list format
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jul 30, 2024
1 parent e51c523 commit ff6ed7a
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 50 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.34.0 - TBD

### Added

- The `list` subcommand now supports the format `jsonschema`. (@Jeffail)

## 4.33.0 - 2024-07-19

### Added
Expand Down
2 changes: 1 addition & 1 deletion internal/cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Either run {{.ProductName}} as a stream processor or choose a command:
{{.BinaryName}} list inputs
{{.BinaryName}} create kafka//file > ./config.yaml
{{.BinaryName}} run ./config.yaml
{{.BinaryName}} run -r "./production/*.yaml" -c ./config.yaml`)[1:],
{{.BinaryName}} run -r "./production/*.yaml" ./config.yaml`)[1:],
Flags: flags,
Before: func(c *cli.Context) error {
opts.RootCommonFlagsExtract(c)
Expand Down
9 changes: 8 additions & 1 deletion internal/cli/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import (
"github.com/redpanda-data/benthos/v4/internal/cli/common"
"github.com/redpanda-data/benthos/v4/internal/config/schema"
"github.com/redpanda-data/benthos/v4/internal/cuegen"
"github.com/redpanda-data/benthos/v4/internal/jsonschema"
)

func listCliCommand(opts *common.CLIOpts) *cli.Command {
flags := []cli.Flag{
&cli.StringFlag{
Name: "format",
Value: "text",
Usage: "Print the component list in a specific format. Options are text, json or cue.",
Usage: "Print the component list in a specific format. Options are text, json, jsonschema, or cue.",
},
&cli.StringFlag{
Name: "status",
Expand Down Expand Up @@ -125,6 +126,12 @@ func listComponents(c *cli.Context, opts *common.CLIOpts) {
panic(err)
}
fmt.Fprintln(opts.Stdout, string(jsonBytes))
case "jsonschema":
jsonSchemaBytes, err := jsonschema.Marshal(schema.Config, opts.Environment)
if err != nil {
panic(err)
}
fmt.Fprintln(opts.Stdout, string(jsonSchemaBytes))
case "cue":
source, err := cuegen.GenerateSchema(schema)
if err != nil {
Expand Down
61 changes: 61 additions & 0 deletions internal/jsonschema/stream_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package jsonschema

import (
"encoding/json"

"github.com/redpanda-data/benthos/v4/internal/bundle"
"github.com/redpanda-data/benthos/v4/internal/docs"
)

func compSpecsToDefinition(specs []docs.ComponentSpec, typeFields map[string]docs.FieldSpec) map[string]any {
generalFields := map[string]any{}
for k, v := range typeFields {
generalFields[k] = v.JSONSchema()
}

var componentDefs []any
for _, s := range specs {
componentDefs = append(componentDefs, map[string]any{
"type": "object",
"properties": map[string]any{
s.Name: s.Config.JSONSchema(),
},
})
}

return map[string]any{
"allOf": []any{
map[string]any{
"anyOf": componentDefs, // TODO: Convert this to oneOf once issues are resolved.
},
map[string]any{
"type": "object",
"properties": generalFields,
},
},
}
}

// Marshal attempts to marshal a JSON Schema definition containing the entire
// config and plugin ecosystem such that other applications can potentially
// execute their own linting and generation tools with it.
func Marshal(fields docs.FieldSpecs, env *bundle.Environment) ([]byte, error) {
defs := map[string]any{
"input": compSpecsToDefinition(env.InputDocs(), docs.ReservedFieldsByType(docs.TypeInput)),
"buffer": compSpecsToDefinition(env.BufferDocs(), docs.ReservedFieldsByType(docs.TypeBuffer)),
"cache": compSpecsToDefinition(env.CacheDocs(), docs.ReservedFieldsByType(docs.TypeCache)),
"processor": compSpecsToDefinition(env.ProcessorDocs(), docs.ReservedFieldsByType(docs.TypeProcessor)),
"rate_limit": compSpecsToDefinition(env.RateLimitDocs(), docs.ReservedFieldsByType(docs.TypeRateLimit)),
"output": compSpecsToDefinition(env.OutputDocs(), docs.ReservedFieldsByType(docs.TypeOutput)),
"metrics": compSpecsToDefinition(env.MetricsDocs(), docs.ReservedFieldsByType(docs.TypeMetrics)),
"tracer": compSpecsToDefinition(env.TracersDocs(), docs.ReservedFieldsByType(docs.TypeTracer)),
"scanner": compSpecsToDefinition(env.ScannerDocs(), docs.ReservedFieldsByType(docs.TypeScanner)),
}

schemaObj := map[string]any{
"properties": fields.JSONSchema(),
"definitions": defs,
}

return json.Marshal(schemaObj)
}
74 changes: 74 additions & 0 deletions internal/jsonschema/stream_schema_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package jsonschema_test

import (
"errors"
"testing"

"github.com/stretchr/testify/require"

jsonschema "github.com/xeipuuv/gojsonschema"

"github.com/redpanda-data/benthos/v4/internal/bundle"
"github.com/redpanda-data/benthos/v4/internal/component/input"
"github.com/redpanda-data/benthos/v4/internal/component/processor"
"github.com/redpanda-data/benthos/v4/internal/config"
"github.com/redpanda-data/benthos/v4/internal/docs"
ijschema "github.com/redpanda-data/benthos/v4/internal/jsonschema"

_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

func testEnvWithPlugins(t testing.TB) *bundle.Environment {
t.Helper()

env := bundle.GlobalEnvironment.Clone()

require.NoError(t, env.InputAdd(func(c input.Config, nm bundle.NewManagement) (input.Streamed, error) {
return nil, errors.New("nope")
}, docs.ComponentSpec{
Name: "testinput",
Type: docs.TypeInput,
Config: docs.FieldComponent().WithChildren(
docs.FieldString("woof", "", "WOOF"),
),
}))

require.NoError(t, env.ProcessorAdd(func(conf processor.Config, mgr bundle.NewManagement) (processor.V1, error) {
return nil, errors.New("nope")
}, docs.ComponentSpec{
Name: "testprocessor",
Type: docs.TypeProcessor,
Config: docs.FieldComponent().WithChildren(
docs.FieldBloblang("mapfield", ""),
),
}))

return env
}

func TestJSONSchema(t *testing.T) {
env := testEnvWithPlugins(t)

testSchema, err := ijschema.Marshal(config.Spec(), env)
require.NoError(t, err)

schema, err := jsonschema.NewSchema(jsonschema.NewStringLoader(string(testSchema)))
require.NoError(t, err)

res, err := schema.Validate(jsonschema.NewGoLoader(map[string]any{
"input": map[string]any{
"testinput": map[string]any{
"woof": "uhhhhh, woof!",
},
"processors": []any{
map[string]any{
"testprocessor": map[string]any{
"mapfield": "hello world",
},
},
},
},
}))
require.NoError(t, err)
require.Empty(t, res.Errors())
}
50 changes: 2 additions & 48 deletions public/service/stream_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/redpanda-data/benthos/v4/internal/config/schema"
"github.com/redpanda-data/benthos/v4/internal/docs"
"github.com/redpanda-data/benthos/v4/internal/filepath/ifs"
"github.com/redpanda-data/benthos/v4/internal/jsonschema"
"github.com/redpanda-data/benthos/v4/internal/stream"
"github.com/redpanda-data/benthos/v4/internal/template"
"github.com/redpanda-data/benthos/v4/public/bloblang"
Expand Down Expand Up @@ -213,58 +214,11 @@ func (s *ConfigSchema) MarshalJSONV0() ([]byte, error) {
return json.Marshal(iSchema)
}

func compSpecsToDefinition(specs []docs.ComponentSpec, typeFields map[string]docs.FieldSpec) map[string]any {
generalFields := map[string]any{}
for k, v := range typeFields {
generalFields[k] = v.JSONSchema()
}

var componentDefs []any
for _, s := range specs {
componentDefs = append(componentDefs, map[string]any{
"type": "object",
"properties": map[string]any{
s.Name: s.Config.JSONSchema(),
},
})
}

return map[string]any{
"allOf": []any{
map[string]any{
"anyOf": componentDefs, // TODO: Convert this to oneOf once issues are resolved.
},
map[string]any{
"type": "object",
"properties": generalFields,
},
},
}
}

// MarshalJSONSchema attempts to marshal a JSON Schema definition containing the
// entire config and plugin ecosystem such that other applications can
// potentially execute their own linting and generation tools with it.
func (s *ConfigSchema) MarshalJSONSchema() ([]byte, error) {
s.env.internal.BufferDocs()
defs := map[string]any{
"input": compSpecsToDefinition(s.env.internal.InputDocs(), docs.ReservedFieldsByType(docs.TypeInput)),
"buffer": compSpecsToDefinition(s.env.internal.BufferDocs(), docs.ReservedFieldsByType(docs.TypeBuffer)),
"cache": compSpecsToDefinition(s.env.internal.CacheDocs(), docs.ReservedFieldsByType(docs.TypeCache)),
"processor": compSpecsToDefinition(s.env.internal.ProcessorDocs(), docs.ReservedFieldsByType(docs.TypeProcessor)),
"rate_limit": compSpecsToDefinition(s.env.internal.RateLimitDocs(), docs.ReservedFieldsByType(docs.TypeRateLimit)),
"output": compSpecsToDefinition(s.env.internal.OutputDocs(), docs.ReservedFieldsByType(docs.TypeOutput)),
"metrics": compSpecsToDefinition(s.env.internal.MetricsDocs(), docs.ReservedFieldsByType(docs.TypeMetrics)),
"tracer": compSpecsToDefinition(s.env.internal.TracersDocs(), docs.ReservedFieldsByType(docs.TypeTracer)),
"scanner": compSpecsToDefinition(s.env.internal.ScannerDocs(), docs.ReservedFieldsByType(docs.TypeScanner)),
}

schemaObj := map[string]any{
"properties": s.fields.JSONSchema(),
"definitions": defs,
}

return json.Marshal(schemaObj)
return jsonschema.Marshal(s.fields, s.env.internal)
}

// SetVersion sets the version and date-built stamp associated with the schema.
Expand Down

0 comments on commit ff6ed7a

Please sign in to comment.