Skip to content

Commit

Permalink
feat(archive): Add support to filter list by labels. Closes argoproj#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec authored Feb 17, 2020
1 parent 79f1337 commit d309d5c
Show file tree
Hide file tree
Showing 11 changed files with 352 additions and 100 deletions.
5 changes: 4 additions & 1 deletion cmd/argo/commands/archive/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (

func NewListCommand() *cobra.Command {
var (
output string
selector string
output string
)
var command = &cobra.Command{
Use: "list",
Expand All @@ -30,6 +31,7 @@ func NewListCommand() *cobra.Command {
resp, err := serviceClient.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{
ListOptions: &metav1.ListOptions{
FieldSelector: "metadata.namespace=" + namespace,
LabelSelector: selector,
},
})
errors.CheckError(err)
Expand Down Expand Up @@ -57,5 +59,6 @@ func NewListCommand() *cobra.Command {
},
}
command.Flags().StringVarP(&output, "output", "o", "wide", "Output format. One of: json|yaml|wide")
command.Flags().StringVarP(&selector, "selector", "l", "", "Selector (label query) to filter on, not including uninitialized ones")
return command
}
57 changes: 57 additions & 0 deletions persist/sqldb/archived_workflow_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package sqldb

import (
"fmt"
"strconv"
"strings"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"upper.io/db.v3"
)

func labelsClause(t dbType, requirements labels.Requirements) (db.Compound, error) {
var conds []db.Compound
for _, r := range requirements {
cond, err := requirementToCondition(t, r)
if err != nil {
return nil, err
}
conds = append(conds, cond)
}
return db.And(conds...), nil
}

func requirementToCondition(t dbType, r labels.Requirement) (db.Compound, error) {
// Should we "sanitize our inputs"? No.
// https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
// Valid label values must be 63 characters or less and must be empty or begin and end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.), and alphanumerics between.
// https://kb.objectrocket.com/postgresql/casting-in-postgresql-570#string+to+integer+casting
switch r.Operator() {
case selection.DoesNotExist:
return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key())), nil
case selection.Equals, selection.DoubleEquals:
return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), r.Values().List()[0])), nil
case selection.In:
return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value in ('%s'))", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil
case selection.NotEquals:
return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), r.Values().List()[0])), nil
case selection.NotIn:
return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value in ('%s'))", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil
case selection.Exists:
return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key())), nil
case selection.GreaterThan:
i, err := strconv.Atoi(r.Values().List()[0])
if err != nil {
return nil, err
}
return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and cast(value as %s) > %d)", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), t.intType(), i)), nil
case selection.LessThan:
i, err := strconv.Atoi(r.Values().List()[0])
if err != nil {
return nil, err
}
return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and cast(value as %s) < %d)", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), t.intType(), i)), nil
}
return nil, fmt.Errorf("operation %v is not supported", r.Operator())
}
47 changes: 47 additions & 0 deletions persist/sqldb/archived_workflow_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package sqldb

import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
"upper.io/db.v3"
)

func Test_labelsClause(t *testing.T) {
tests := []struct {
name string
dbType dbType
requirements labels.Requirements
want db.Compound
}{
{"Empty", Postgres, requirements(""), db.And()},
{"DoesNotExist", Postgres, requirements("!foo"), db.And(db.Raw("not exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo')"))},
{"Equals", Postgres, requirements("foo=bar"), db.And(db.Raw("exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo' and value = 'bar')"))},
{"DoubleEquals", Postgres, requirements("foo==bar"), db.And(db.Raw("exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo' and value = 'bar')"))},
{"In", Postgres, requirements("foo in (bar,baz)"), db.And(db.Raw("exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo' and value in ('bar', 'baz'))"))},
{"NotEquals", Postgres, requirements("foo != bar"), db.And(db.Raw("not exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo' and value = 'bar')"))},
{"NotIn", Postgres, requirements("foo notin (bar,baz)"), db.And(db.Raw("not exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo' and value in ('bar', 'baz'))"))},
{"Exists", Postgres, requirements("foo"), db.And(db.Raw("exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo')"))},
{"GreaterThanPostgres", Postgres, requirements("foo>2"), db.And(db.Raw("exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo' and cast(value as int) > 2)"))},
{"GreaterThanMySQL", MySQL, requirements("foo>2"), db.And(db.Raw("exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo' and cast(value as signed) > 2)"))},
{"LessThanPostgres", Postgres, requirements("foo<2"), db.And(db.Raw("exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo' and cast(value as int) < 2)"))},
{"LessThanMySQL", MySQL, requirements("foo<2"), db.And(db.Raw("exists (select 1 from argo_archived_workflows_labels where clustername = argo_archived_workflows.clustername and uid = argo_archived_workflows.uid and name = 'foo' and cast(value as signed) < 2)"))},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := labelsClause(tt.dbType, tt.requirements)
if assert.NoError(t, err) {
assert.Equal(t, tt.want.Sentences(), got.Sentences())
}
})
}
}

func requirements(selector string) []labels.Requirement {
requirements, err := labels.ParseToRequirements(selector)
if err != nil {
panic(err)
}
return requirements
}
30 changes: 30 additions & 0 deletions persist/sqldb/db_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package sqldb

import (
"database/sql"

"github.com/go-sql-driver/mysql"
"upper.io/db.v3"
)

type dbType string

const (
MySQL dbType = "mysql"
Postgres dbType = "postgres"
)

func dbTypeFor(session db.Database) dbType {
switch session.Driver().(*sql.DB).Driver().(type) {
case *mysql.MySQLDriver:
return MySQL
}
return Postgres
}

func (t dbType) intType() string {
if t == MySQL {
return "signed"
}
return "int"
}
60 changes: 34 additions & 26 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package sqldb

import (
"context"
"database/sql"

"github.com/go-sql-driver/mysql"
log "github.com/sirupsen/logrus"
"upper.io/db.v3/lib/sqlbuilder"
)
Expand Down Expand Up @@ -57,11 +55,7 @@ func (m migrate) Exec(ctx context.Context) error {
return err
}
}
dbType := "postgres"
switch m.session.Driver().(*sql.DB).Driver().(type) {
case *mysql.MySQLDriver:
dbType = "mysql"
}
dbType := dbTypeFor(m.session)

log.WithFields(log.Fields{"clusterName": m.clusterName, "dbType": dbType}).Info("Migrating database schema")

Expand Down Expand Up @@ -91,18 +85,18 @@ func (m migrate) Exec(ctx context.Context) error {
primary key (id, namespace)
)`),
ansiSQLChange(`alter table argo_workflow_history rename to argo_archived_workflows`),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`drop index idx_name on `+m.tableName),
ansiSQLChange(`drop index idx_name`),
),
ansiSQLChange(`create unique index idx_name on ` + m.tableName + `(name, namespace)`),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table `+m.tableName+` drop primary key`),
ansiSQLChange(`alter table `+m.tableName+` drop constraint `+m.tableName+`_pkey`),
),
ansiSQLChange(`alter table ` + m.tableName + ` add primary key(name,namespace)`),
// huh - why does the pkey not have the same name as the table - history
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows drop primary key`),
ansiSQLChange(`alter table argo_archived_workflows drop constraint argo_workflow_history_pkey`),
),
Expand All @@ -111,37 +105,37 @@ func (m migrate) Exec(ctx context.Context) error {
// THE CHANGES ABOVE THIS LINE MAY BE IN PER-PRODUCTION SYSTEMS - DO NOT CHANGE THEM
// ***
ansiSQLChange(`alter table argo_archived_workflows rename column id to uid`),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column uid varchar(128) not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column uid set not null`),
),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column phase varchar(25) not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column phase set not null`),
),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column namespace varchar(256) not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column namespace set not null`),
),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column workflow text not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column workflow set not null`),
),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column startedat timestamp not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column startedat set not null`),
),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column finishedat timestamp not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column finishedat set not null`),
),
ansiSQLChange(`alter table argo_archived_workflows add clustername varchar(64)`), // DNS entry can only be max 63 bytes
ansiSQLChange(`update argo_archived_workflows set clustername = ` + m.clusterName + ` where clustername is null`),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column clustername varchar(64) not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column clustername set not null`),
),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows drop primary key`),
ansiSQLChange(`alter table argo_archived_workflows drop constraint argo_archived_workflows_pkey`),
),
Expand All @@ -154,36 +148,36 @@ func (m migrate) Exec(ctx context.Context) error {
ansiSQLChange(`alter table ` + m.tableName + ` drop column startedat`),
ansiSQLChange(`alter table ` + m.tableName + ` drop column finishedat`),
ansiSQLChange(`alter table ` + m.tableName + ` rename column id to uid`),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table `+m.tableName+` modify column uid varchar(128) not null`),
ansiSQLChange(`alter table `+m.tableName+` alter column uid set not null`),
),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table `+m.tableName+` modify column namespace varchar(256) not null`),
ansiSQLChange(`alter table `+m.tableName+` alter column namespace set not null`),
),
ansiSQLChange(`alter table ` + m.tableName + ` add column clustername varchar(64)`), // DNS cannot be longer than 64 bytes
ansiSQLChange(`update ` + m.tableName + ` set clustername = ` + m.clusterName + ` where clustername is null`),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table `+m.tableName+` modify column clustername varchar(64) not null`),
ansiSQLChange(`alter table `+m.tableName+` alter column clustername set not null`),
),
ansiSQLChange(`alter table ` + m.tableName + ` add column version varchar(64)`),
ansiSQLChange(`alter table ` + m.tableName + ` add column nodes text`),
backfillNodes{tableName: m.tableName},
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table `+m.tableName+` modify column nodes text not null`),
ansiSQLChange(`alter table `+m.tableName+` alter column nodes set not null`),
),
ansiSQLChange(`alter table ` + m.tableName + ` drop column workflow`),
// add a timestamp column to indicate updated time
ansiSQLChange(`alter table ` + m.tableName + ` add column updatedat timestamp not null default current_timestamp`),
// remove the old primary key and add a new one
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table `+m.tableName+` drop primary key`),
ansiSQLChange(`alter table `+m.tableName+` drop constraint `+m.tableName+`_pkey`),
),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`drop index idx_name on `+m.tableName),
ansiSQLChange(`drop index idx_name`),
),
Expand All @@ -192,16 +186,30 @@ func (m migrate) Exec(ctx context.Context) error {
ansiSQLChange(`create index ` + m.tableName + `_i1 on ` + m.tableName + ` (clustername,namespace)`),
// argo_workflows now looks like:
// clustername(not null) | uid(not null) | namespace(not null) | version(not null) | nodes(not null) | updatedat(not null)
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column workflow json not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column workflow type json using workflow::json`),
),
ternary(dbType == "mysql",
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column name varchar(256) not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column name set not null`),
),
// clustername(not null) | uid(not null) | | name (not null) | phase(not null) | namespace(not null) | workflow(not null) | startedat(not null) | finishedat(not null)
ansiSQLChange(`create index ` + m.tableName + `_i2 on ` + m.tableName + ` (clustername,namespace,updatedat)`),
// The argo_archived_workflows_labels is really provided as a way to create queries on labels that are fast because they
// use indexes. When displaying, it might be better to look at the `workflow` column.
// We could have added a `labels` column to argo_archived_workflows, but then we would have had to do free-text
// queries on it which would be slow due to having to table scan.
// The key has an optional prefix(253 chars) + '/' + name(63 chars)
// Why is the key called "name" not "key"? Key is an SQL reserved word.
ansiSQLChange(`create table if not exists argo_archived_workflows_labels (
clustername varchar(64) not null,
uid varchar(128) not null,
name varchar(317) not null,
value varchar(63) not null,
primary key (clustername, uid, name),
foreign key (clustername, uid) references argo_archived_workflows(clustername, uid) on delete cascade
)`),
} {
err := m.applyChange(ctx, changeSchemaVersion, change)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions persist/sqldb/mocks/WorkflowArchive.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion persist/sqldb/null_workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sqldb
import (
"fmt"

"k8s.io/apimachinery/pkg/labels"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

Expand All @@ -15,7 +17,7 @@ func (r *nullWorkflowArchive) ArchiveWorkflow(*wfv1.Workflow) error {
return nil
}

func (r *nullWorkflowArchive) ListWorkflows(string, int, int) (wfv1.Workflows, error) {
func (r *nullWorkflowArchive) ListWorkflows(string, labels.Requirements, int, int) (wfv1.Workflows, error) {
return wfv1.Workflows{}, nil
}

Expand Down
Loading

0 comments on commit d309d5c

Please sign in to comment.