Skip to content

Commit

Permalink
Merge branch 'master' into ingest-worker-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
travisturner authored Jul 26, 2019
2 parents e044712 + c1f8216 commit 1af4219
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 32 deletions.
23 changes: 12 additions & 11 deletions docs/pdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ the record to arrive at that field. For example:

This JSON object would result in the following Pilosa schema:

| Field | Type | Min | Max | Size |
|----------------|--------|-----|------------|--------|
| name | ranked | | | 100000 |
| favorite_foods | ranked | | | 100000 |
| default | ranked | | | 100000 |
| age | int | 0 | 2147483647 | |
| location | ranked | | | 1000 |
| latitude | int | 0 | 2147483647 | |
| longitude | int | 0 | 2147483647 | |
| location-city | ranked | | | 100000 |
| location-state | ranked | | | 100000 |
| Field | Example Value | Type | Cache Size |
|----------------|---------------|--------|------------|
| name | "jill" | ranked | 100000 |
| favorite_foods | "corn chips" | ranked | 100000 |
| default | | ranked | 100000 |
| age | 27 | int | |
| location | | ranked | 1000 |
| latitude | 3754 | int | |
| longitude | 4526 | int | |
| location-city | "Austin" | ranked | 100000 |
| location-state | "Texas" | ranked | 100000 |

All set fields are created as ranked fields by default, with the cache size
listed above. Integer fields are created with a minimum size of zero and a
Expand All @@ -66,6 +66,7 @@ control over the way data is indexed, and ingestion performance.
* `--subject-path`: If nothing is passed for this option, then each record will be assigned a unique sequential column ID. If `subject-path` is specified, then the value at this path in the record will be mapped to a column ID. If the same value appears in another record, the same column ID will be used.
* `--proxy`: The PDK ingests data, but also keeps a mapping for string values to row IDs, and from subjects to column ids. Because of this, querying Pilosa directly may not be useful, since it only returns integer row and column ids. The PDK will start a proxy server which intercepts requests to Pilosa using strings for row and column ids, and translates them to the integers that Pilosa understands. It will also translate responses so that (e.g.) a TopN query will return `{"results":[[{"Key":"chipotle dip","Count":1},{"Key":"corn chips","Count":1}]]}`. By default, the mapping is stored in an embedded leveldb.

For more information on running `pdk kafka` and how Pilosa interfaces with Kafka, please see the [kafka directory](https://github.com/pilosa/pdk/tree/master/kafka) in the pdk repository.

### Library

Expand Down
13 changes: 10 additions & 3 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1512,14 +1512,21 @@ func (e *executor) executeRowShard(ctx context.Context, index string, c *pql.Cal
}

// Union bitmaps across all time-based views.
row := &Row{}
for _, view := range viewsByTimeRange(viewStandard, fromTime, toTime, q) {
views := viewsByTimeRange(viewStandard, fromTime, toTime, q)
rows := make([]*Row, 0, len(views))
for _, view := range views {
f := e.Holder.fragment(index, fieldName, view, shard)
if f == nil {
continue
}
row = row.Union(f.row(rowID))
rows = append(rows, f.row(rowID))
}
if len(rows) == 0 {
return &Row{}, nil
} else if len(rows) == 1 {
return rows[0], nil
}
row := rows[0].Union(rows[1:]...)
f.Stats.Count("range", 1, 1.0)
return row, nil

Expand Down
15 changes: 12 additions & 3 deletions executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2920,6 +2920,7 @@ func TestExecutor_Execute_ClearRow(t *testing.T) {
Set(2, f=10, 2001-01-01T00:00)`
readQueries := []string{
`Row(f=1, from=1999-12-31T00:00, to=2003-01-01T03:00)`,
`Row(f=1, from=2002-01-01T00:00, to=2002-01-02T00:00)`,
`ClearRow(f=1)`,
`Row(f=1, from=1999-12-31T00:00, to=2003-01-01T03:00)`,
`Row(f=10, from=1999-12-31T00:00, to=2003-01-01T03:00)`,
Expand All @@ -2931,20 +2932,26 @@ func TestExecutor_Execute_ClearRow(t *testing.T) {
t.Fatalf("unexpected columns: %+v", columns)
}

// Single day query (regression test)
if columns := responses[1].Results[0].(*pilosa.Row).Columns(); !reflect.DeepEqual(columns, []uint64{7}) {
t.Fatalf("unexpected columns: %+v", columns)
}

// Clear the row and ensure we get a `true` response.
if res := responses[1].Results[0].(bool); !res {
if res := responses[2].Results[0].(bool); !res {
t.Fatalf("unexpected clear row result: %+v", res)
}

// Ensure the row is empty.
if columns := responses[2].Results[0].(*pilosa.Row).Columns(); !reflect.DeepEqual(columns, []uint64{}) {
if columns := responses[3].Results[0].(*pilosa.Row).Columns(); !reflect.DeepEqual(columns, []uint64{}) {
t.Fatalf("unexpected columns: %+v", columns)
}

// Ensure other rows were not affected.
if columns := responses[3].Results[0].(*pilosa.Row).Columns(); !reflect.DeepEqual(columns, []uint64{2}) {
if columns := responses[4].Results[0].(*pilosa.Row).Columns(); !reflect.DeepEqual(columns, []uint64{2}) {
t.Fatalf("unexpected columns: %+v", columns)
}

})

t.Run("Int", func(t *testing.T) {
Expand Down Expand Up @@ -3292,6 +3299,7 @@ func TestExecutor_Execute_RowsTime(t *testing.T) {
`Rows(f)`,
`Rows(f, from=2002-01-01T00:00)`,
`Rows(f, to=2003-02-03T00:00)`,
`Rows(f, from=2002-01-01T00:00, to=2002-01-02T00:00)`,
}
expResults := [][]uint64{
{1},
Expand All @@ -3300,6 +3308,7 @@ func TestExecutor_Execute_RowsTime(t *testing.T) {
{1, 2, 3, 4, 13},
{2, 3, 4, 13},
{1, 2, 3, 13},
{2},
}

responses := runCallTest(t, writeQuery, readQueries,
Expand Down
61 changes: 46 additions & 15 deletions row.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,48 @@ func (r *Row) Xor(other *Row) *Row {
}

// Union returns the bitwise union of r and other.
func (r *Row) Union(other *Row) *Row {
var segments []rowSegment
itr := newMergeSegmentIterator(r.segments, other.segments)
for s0, s1 := itr.next(); s0 != nil || s1 != nil; s0, s1 = itr.next() {
if s1 == nil {
segments = append(segments, *s0)
continue
} else if s0 == nil {
segments = append(segments, *s1)
continue
func (r *Row) Union(others ...*Row) *Row {
segments := make([][]rowSegment, 0, len(others)+1)
if len(r.segments) > 0 {
segments = append(segments, r.segments)
}
nextSegs := make([][]rowSegment, 0, len(others)+1)
toProcess := make([]*rowSegment, 0, len(others)+1)
var output []rowSegment
for _, other := range others {
if len(other.segments) > 0 {
segments = append(segments, other.segments)
}
segments = append(segments, *s0.Union(s1))
}

return &Row{segments: segments}
for len(segments) > 0 {
shard := segments[0][0].shard
for _, segs := range segments {
if segs[0].shard < shard {
shard = segs[0].shard
}
}
nextSegs = nextSegs[:0]
toProcess := toProcess[:0]
for _, segs := range segments {
if segs[0].shard == shard {
toProcess = append(toProcess, &segs[0])
segs = segs[1:]
}
if len(segs) > 0 {
nextSegs = append(nextSegs, segs)
}
}
// at this point, "toProcess" is a list of all the segments
// sharing the lowest ID, and nextSegs is a list of all the others.
// Swap the segment lists (so we don't have to reallocate it)
segments, nextSegs = nextSegs, segments
if len(toProcess) == 1 {
output = append(output, *toProcess[0])
} else {
output = append(output, *toProcess[0].Union(toProcess[1:]...))
}
}
return &Row{segments: output}
}

// Difference returns the diff of r and other.
Expand Down Expand Up @@ -350,8 +377,12 @@ func (s *rowSegment) Intersect(other *rowSegment) *rowSegment {
}

// Union returns the bitwise union of s and other.
func (s *rowSegment) Union(other *rowSegment) *rowSegment {
data := s.data.Union(other.data)
func (s *rowSegment) Union(others ...*rowSegment) *rowSegment {
datas := make([]*roaring.Bitmap, len(others))
for i, other := range others {
datas[i] = other.data
}
data := s.data.Union(datas...)
data.Freeze()

return &rowSegment{
Expand Down

0 comments on commit 1af4219

Please sign in to comment.