Skip to content

Commit

Permalink
Update flow active time when the result set is completed (Velocidex#1468
Browse files Browse the repository at this point in the history
)

With the MemcacheFileStore, writes are delayed so they may be
combined. Previously the collection context was written out before the
full result set was committed to storage causing a refresh issue in
the gui - the gui would not update when the collection completed and
show an empty result set.
  • Loading branch information
scudette authored Jan 10, 2022
1 parent 4c4bc52 commit 80cbd91
Show file tree
Hide file tree
Showing 15 changed files with 520 additions and 300 deletions.
18 changes: 9 additions & 9 deletions api/proto/api.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

501 changes: 257 additions & 244 deletions config/proto/config.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions config/proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,9 @@ message DatastoreConfig {
// filesystems (default 100).
int64 memcache_write_mutation_writers = 6;

// How long to delay writes so they can be combined (default 1 sec)
int64 memcache_write_mutation_max_age = 9;

// Experimental - do not set in configs yet!
string minion_implementation = 7;
string master_implementation = 8;
Expand Down
14 changes: 9 additions & 5 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@ func GetDB(config_obj *config_proto.Config) (DataStore, error) {
return nil, errors.New("no datastore configured")
}

return getImpl(config_obj.Datastore.Implementation, config_obj)
implementation, err := GetImplementationName(config_obj)
if err != nil {
return nil, err
}

return getImpl(implementation)
}

func getImpl(implementation string,
config_obj *config_proto.Config) (DataStore, error) {
func getImpl(implementation string) (DataStore, error) {
switch implementation {
case "FileBaseDataStore":
return file_based_imp, nil
Expand Down Expand Up @@ -154,7 +158,7 @@ func getImpl(implementation string,

default:
return nil, errors.New("no datastore implementation " +
config_obj.Datastore.Implementation)
implementation)
}
}

Expand All @@ -164,6 +168,6 @@ func SetGlobalDatastore(
ds_mu.Lock()
defer ds_mu.Unlock()

g_impl, err = getImpl(implementation, config_obj)
g_impl, err = getImpl(implementation)
return err
}
34 changes: 34 additions & 0 deletions datastore/memcache_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"www.velocidex.com/golang/velociraptor/file_store/path_specs"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/utils"
"www.velocidex.com/golang/velociraptor/vtesting"
)

Expand Down Expand Up @@ -133,6 +134,39 @@ func (self MemcacheFileTestSuite) TestListChildren() {
assert.Equal(self.T(), children[0].AsClientPath(), "/a/b")
}

func (self MemcacheFileTestSuite) TestSetSubjectAndListChildren() {
db, ok := self.datastore.(*MemcacheFileDataStore)
assert.True(self.T(), ok)

// Setting the data ends up on the filesystem
client_id := "C.1234"
client_record := &api_proto.ClientMetadata{
ClientId: client_id,
}

// Write the file to the filesystem
urn := path_specs.NewSafeDatastorePath("a", "b")
err := file_based_imp.SetSubject(self.config_obj, urn, client_record)
assert.NoError(self.T(), err)

urn2 := path_specs.NewSafeDatastorePath("a", "d")
err = file_based_imp.SetSubject(self.config_obj, urn2, client_record)
assert.NoError(self.T(), err)

// Now set a file in an existing directory.
intermediate := path_specs.NewSafeDatastorePath("a", "e")
new_record := &api_proto.ClientMetadata{}
err = db.SetSubject(self.config_obj, intermediate, new_record)
assert.NoError(self.T(), err)

// Now list the memcache
first_level := path_specs.NewSafeDatastorePath("a")
children, err := db.ListChildren(self.config_obj, first_level)
assert.NoError(self.T(), err)
assert.Equal(self.T(), len(children), 3)
utils.Debug(children)
}

func TestMemCacheFileDatastore(t *testing.T) {
suite.Run(t, &MemcacheFileTestSuite{BaseTestSuite: BaseTestSuite{
datastore: NewMemcacheFileDataStore(),
Expand Down
23 changes: 23 additions & 0 deletions datastore/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datastore

import (
"errors"
"sync"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -74,3 +75,25 @@ func Walk(config_obj *config_proto.Config,

return nil
}

func GetImplementationName(
config_obj *config_proto.Config) (string, error) {
if config_obj.Datastore == nil {
return "", errors.New("Invalid datastore config")
}

if config_obj.Frontend == nil {
return config_obj.Datastore.Implementation, nil
}

if config_obj.Frontend.IsMinion &&
config_obj.Datastore.MinionImplementation != "" {
return config_obj.Datastore.MinionImplementation, nil
}

if config_obj.Datastore.MasterImplementation != "" {
return config_obj.Datastore.MasterImplementation, nil
}

return config_obj.Datastore.Implementation, nil
}
15 changes: 13 additions & 2 deletions file_store/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"

config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/datastore"
"www.velocidex.com/golang/velociraptor/file_store/accessors"
"www.velocidex.com/golang/velociraptor/file_store/api"
"www.velocidex.com/golang/velociraptor/file_store/directory"
Expand Down Expand Up @@ -54,7 +55,12 @@ func GetFileStore(config_obj *config_proto.Config) api.FileStore {
return nil
}

res, _ := getImpl(config_obj.Datastore.Implementation, config_obj)
implementation, err := datastore.GetImplementationName(config_obj)
if err != nil {
panic(err)
}

res, _ := getImpl(implementation, config_obj)
return res
}

Expand Down Expand Up @@ -100,7 +106,12 @@ func GetFileStoreFileSystemAccessor(
return nil, errors.New("Datastore not configured")
}

switch config_obj.Datastore.Implementation {
implementation, err := datastore.GetImplementationName(config_obj)
if err != nil {
return nil, err
}

switch implementation {

case "MemcacheFileDataStore":
return accessors.NewFileStoreFileSystemAccessor(
Expand Down
6 changes: 5 additions & 1 deletion file_store/memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ func NewMemcacheFileStore(config_obj *config_proto.Config) *MemcacheFileStore {
data_cache: ttlcache.NewCache(),
}

result.data_cache.SetTTL(5 * time.Second)
ttl := config_obj.Datastore.MemcacheWriteMutationMaxAge
if ttl == 0 {
ttl = 1
}
result.data_cache.SetTTL(time.Duration(ttl) * time.Second)

result.data_cache.SetNewItemCallback(func(key string, value interface{}) {
metricDataLRU.Inc()
Expand Down
7 changes: 6 additions & 1 deletion file_store/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/datastore"
"www.velocidex.com/golang/velociraptor/file_store/api"
"www.velocidex.com/golang/velociraptor/file_store/directory"
"www.velocidex.com/golang/velociraptor/file_store/memory"
Expand All @@ -18,8 +19,12 @@ func GetQueueManager(config_obj *config_proto.Config) (api.QueueManager, error)
}

file_store := GetFileStore(config_obj)
implementation, err := datastore.GetImplementationName(config_obj)
if err != nil {
return nil, err
}

switch config_obj.Datastore.Implementation {
switch implementation {

// For now everyone uses an in-memory queue manager.
case "Test":
Expand Down
6 changes: 6 additions & 0 deletions flows/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ func GetFlowDetails(
return nil, err
}

ping := &flows_proto.PingContext{}
err = db.GetSubject(config_obj, flow_path_manager.Ping(), ping)
if err == nil && ping.ActiveTime > collection_context.ActiveTime {
collection_context.ActiveTime = ping.ActiveTime
}

availableDownloads, _ := availableDownloadFiles(config_obj, client_id, flow_id)
return &api_proto.FlowDetails{
Context: collection_context,
Expand Down
32 changes: 31 additions & 1 deletion flows/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func NewCollectionContext(config_obj *config_proto.Config) *CollectionContext {
self.mu.Lock()
defer self.mu.Unlock()

// Mark the collection as updated.
updateContext(config_obj, self.ClientId, self.SessionId)

if !self.send_update {
return
}
Expand All @@ -120,12 +123,34 @@ func NewCollectionContext(config_obj *config_proto.Config) *CollectionContext {
journal.PushRowsToArtifactAsync(
config_obj, row, "System.Flow.Completion")
}

})

return self
}

// Flush the context object to disk. This must happen AFTER all data
// is written
func updateContext(
config_obj *config_proto.Config,
client_id, flow_id string) error {

db, err := datastore.GetDB(config_obj)
if err != nil {
return err
}

ping_record := &flows_proto.PingContext{
ActiveTime: uint64(time.Now().UnixNano() / 1000),
}

flow_path_manager := paths.NewFlowPathManager(client_id, flow_id)

// Just a blind write.
return db.SetSubjectWithCompletion(
config_obj, flow_path_manager.Ping(),
ping_record, nil)
}

// closeContext is called after all messages from the clients are
// processed in this group. Client messages are sent in groups inside
// the same POST request. Most of the time they belong to the same
Expand Down Expand Up @@ -164,6 +189,7 @@ func closeContext(
collection_context.StartTime = uint64(time.Now().UnixNano() / 1000)
}

// Mark the flow as last active now.
collection_context.ActiveTime = uint64(time.Now().UnixNano() / 1000)

// Figure out if we will send a System.Flow.Completion after
Expand All @@ -183,6 +209,7 @@ func closeContext(

// Instruct the completion function to send the message.
collection_context.send_update = true
collection_context.Dirty = true
}

if len(collection_context.Logs) > 0 {
Expand All @@ -191,6 +218,7 @@ func closeContext(
if err != nil {
collection_context.State = flows_proto.ArtifactCollectorContext_ERROR
collection_context.Status = err.Error()
collection_context.Dirty = true
}
}

Expand All @@ -200,6 +228,7 @@ func closeContext(
if err != nil {
collection_context.State = flows_proto.ArtifactCollectorContext_ERROR
collection_context.Status = err.Error()
collection_context.Dirty = true
}
}

Expand All @@ -208,6 +237,7 @@ func closeContext(
if err != nil {
collection_context.State = flows_proto.ArtifactCollectorContext_ERROR
collection_context.Status = err.Error()
collection_context.Dirty = true
}
}

Expand Down
Loading

0 comments on commit 80cbd91

Please sign in to comment.