Skip to content

Commit

Permalink
move root number + helpers into InMemoryView
Browse files Browse the repository at this point in the history
Summary:
This addresses most of the FIXMEs introduced by the prior diff by
moving the root number and tick counter into the synchronized internal view
state in the InMemoryView.

Helper methods have been added to the QueryableView to obtain the current
clock string and to obtain a pair representing the root number and tick
value.

I've introduced a couple more FIXMEs related to the clock strings that
I'm going to resolve in the next diff by adjusting the return value
from the query functions.

Reviewed By: farnz

Differential Revision: D4160844

fbshipit-source-id: 27908ee38325914f2be79f4c8c27b51df6da9b50
  • Loading branch information
wez authored and Facebook Github Bot committed Nov 15, 2016
1 parent 646ee5e commit 61ff1b0
Show file tree
Hide file tree
Showing 17 changed files with 104 additions and 118 deletions.
26 changes: 21 additions & 5 deletions InMemoryView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
#include "make_unique.h"
#include "InMemoryView.h"

// Each root gets a number that uniquely identifies it within the process. This
// helps avoid confusion if a root is removed and then added again.
static std::atomic<long> next_root_number{1};

namespace watchman {

InMemoryView::view::view(const w_string& root_path)
: root_dir(watchman::make_unique<watchman_dir>(root_path, nullptr)) {}
: root_dir(watchman::make_unique<watchman_dir>(root_path, nullptr)),
rootNumber(next_root_number++) {}

InMemoryView::InMemoryView(w_root_t* root, std::shared_ptr<Watcher> watcher)
: cookies_(root->cookies),
Expand All @@ -36,7 +41,7 @@ void InMemoryView::markFileChanged(
}

file->otime.timestamp = now.tv_sec;
file->otime.ticks = mostRecentTick_;
file->otime.ticks = view->mostRecentTick;

if (view->latest_file != file) {
// unlink from list
Expand Down Expand Up @@ -229,7 +234,7 @@ watchman_file* InMemoryView::getOrCreateChildFile(

file_ptr = watchman_file::make(file_name, dir);

file_ptr->ctime.ticks = mostRecentTick_;
file_ptr->ctime.ticks = view->mostRecentTick;
file_ptr->ctime.timestamp = now.tv_sec;

auto suffix = file_name.suffix();
Expand Down Expand Up @@ -548,8 +553,19 @@ bool InMemoryView::allFilesGenerator(
return result;
}

uint32_t InMemoryView::getMostRecentTickValue() const {
return mostRecentTick_.load();
ClockPosition InMemoryView::getMostRecentRootNumberAndTickValue() const {
auto view = view_.rlock();
return ClockPosition(view->rootNumber, view->mostRecentTick);
}

w_string InMemoryView::getCurrentClockString() const {
auto view = view_.rlock();
char clockbuf[128];
if (!clock_id_string(
view->rootNumber, view->mostRecentTick, clockbuf, sizeof(clockbuf))) {
throw std::runtime_error("clock string exceeded clockbuf size");
}
return w_string(clockbuf, W_STRING_UNICODE);
}

uint32_t InMemoryView::getLastAgeOutTickValue() const {
Expand Down
10 changes: 6 additions & 4 deletions InMemoryView.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ namespace watchman {

/** Keeps track of the state of the filesystem in-memory. */
struct InMemoryView : public QueryableView {
uint32_t getMostRecentTickValue() const override;
ClockPosition getMostRecentRootNumberAndTickValue() const override;
uint32_t getLastAgeOutTickValue() const override;
time_t getLastAgeOutTimeStamp() const override;
w_string getCurrentClockString() const override;

explicit InMemoryView(w_root_t* root, std::shared_ptr<Watcher> watcher);

Expand Down Expand Up @@ -81,6 +82,10 @@ struct InMemoryView : public QueryableView {
std::unordered_map<w_string, std::unique_ptr<file_list_head>> suffixes;

std::unique_ptr<watchman_dir> root_dir;
// The most recently observed tick value of an item in the view
uint32_t mostRecentTick{1};
/* root number */
uint32_t rootNumber{0};

explicit view(const w_string& root_path);

Expand Down Expand Up @@ -197,9 +202,6 @@ struct InMemoryView : public QueryableView {
};
watchman::Synchronized<crawl_state> crawlState_;

// The most recently observed tick value of an item in the view
std::atomic<uint32_t> mostRecentTick_{0};

uint32_t last_age_out_tick{0};
time_t last_age_out_timestamp{0};

Expand Down
4 changes: 0 additions & 4 deletions QueryableView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ bool QueryableView::allFilesGenerator(w_query*, struct w_query_ctx*, int64_t*)
return false;
}

uint32_t QueryableView::getMostRecentTickValue() const {
return 0;
}

uint32_t QueryableView::getLastAgeOutTickValue() const {
return 0;
}
Expand Down
4 changes: 3 additions & 1 deletion QueryableView.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct watchman_dir;
struct watchman_glob_tree;

namespace watchman {

class QueryableView : public std::enable_shared_from_this<QueryableView> {
public:
virtual ~QueryableView();
Expand Down Expand Up @@ -45,7 +46,8 @@ class QueryableView : public std::enable_shared_from_this<QueryableView> {
w_query* query,
struct w_query_ctx* ctx,
int64_t* num_walked) const;
virtual uint32_t getMostRecentTickValue() const;
virtual ClockPosition getMostRecentRootNumberAndTickValue() const = 0;
virtual w_string getCurrentClockString() const = 0;
virtual uint32_t getLastAgeOutTickValue() const;
virtual time_t getLastAgeOutTimeStamp() const;
virtual void ageOut(w_perf_t& sample, std::chrono::seconds minAge);
Expand Down
29 changes: 7 additions & 22 deletions clockspec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ void w_clockspec_eval(

since->is_timestamp = false;

auto position = root->inner.view->getMostRecentRootNumberAndTickValue();

if (spec->tag == w_cs_named_cursor) {
w_string cursor = spec->named_cursor.cursor;

Expand All @@ -121,7 +123,7 @@ void w_clockspec_eval(

// record the current tick value against the cursor so that we use that
// as the basis for a subsequent query.
cursors[cursor] = root->inner.view->getMostRecentTickValue();
cursors[cursor] = position.ticks;
}

w_log(
Expand All @@ -135,7 +137,7 @@ void w_clockspec_eval(
// spec->tag == w_cs_clock
if (spec->clock.start_time == proc_start_time &&
spec->clock.pid == proc_pid &&
spec->clock.root_number == root->inner.number) {
spec->clock.root_number == position.rootNumber) {
since->clock.is_fresh_instance =
spec->clock.ticks < root->inner.view->getLastAgeOutTickValue();
if (since->clock.is_fresh_instance) {
Expand Down Expand Up @@ -172,29 +174,12 @@ bool clock_id_string(uint32_t root_number, uint32_t ticks, char *buf,
return (size_t)res < bufsize;
}

// Renders the current clock id string to the supplied buffer.
// Must be called with the root locked. FIXME
static bool current_clock_id_string(
const std::shared_ptr<w_root_t>& root,
char* buf,
size_t bufsize) {
return clock_id_string(
root->inner.number,
root->inner.view->getMostRecentTickValue(),
buf,
bufsize);
}

/* Add the current clock value to the response.
* must be called with the root locked FIXME */
/* Add the current clock value to the response */
void annotate_with_clock(
const std::shared_ptr<w_root_t>& root,
json_ref& resp) {
char buf[128];

if (current_clock_id_string(root, buf, sizeof(buf))) {
resp.set("clock", typed_string_to_json(buf, W_STRING_UNICODE));
}
resp.set(
"clock", w_string_to_json(root->inner.view->getCurrentClockString()));
}

/* vim:ts=2:sw=2:et:
Expand Down
75 changes: 24 additions & 51 deletions cmds/state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ static void cmd_state_enter(
struct watchman_client* clientbase,
const json_ref& args) {
struct state_arg parsed;
char clockbuf[128];
json_ref response;
auto client = dynamic_cast<watchman_user_client*>(clientbase);

Expand Down Expand Up @@ -104,31 +103,24 @@ static void cmd_state_enter(
client->states[entry->id] = entry.get();
}

{ // FIXME lock
// Sample the clock buf for the subscription PDUs we're going to
// send
clock_id_string(
root->inner.number,
root->inner.view->getMostRecentTickValue(),
clockbuf,
sizeof(clockbuf));
}

// We successfully entered the state, this is our response to the
// state-enter command. We do this before we send the subscription
// PDUs in case CLIENT has active subscriptions for this root
response = make_response();

auto clock = w_string_to_json(root->inner.view->getCurrentClockString());

response.set({{"root", w_string_to_json(root->root_path)},
{"state-enter", w_string_to_json(parsed.name)},
{"clock", typed_string_to_json(clockbuf, W_STRING_UNICODE)}});
{"clock", json_ref(clock)}});
send_and_dispose_response(client, std::move(response));

// Broadcast about the state enter
{
auto payload = json_object(
{{"root", w_string_to_json(root->root_path)},
{"clock", typed_string_to_json(clockbuf, W_STRING_UNICODE)},
{"state-enter", w_string_to_json(parsed.name)}});
auto payload =
json_object({{"root", w_string_to_json(root->root_path)},
{"clock", std::move(clock)},
{"state-enter", w_string_to_json(parsed.name)}});
if (parsed.metadata) {
payload.set("metadata", json_ref(parsed.metadata));
}
Expand All @@ -137,26 +129,17 @@ static void cmd_state_enter(
}
W_CMD_REG("state-enter", cmd_state_enter, CMD_DAEMON, w_cmd_realpath_root)

static void leave_state(struct watchman_user_client *client,
struct watchman_client_state_assertion *assertion,
bool abandoned, json_t *metadata, const char *clockbuf) {
char buf[128];

if (!clockbuf) {
// FIXME lock
clock_id_string(
assertion->root->inner.number,
assertion->root->inner.view->getMostRecentTickValue(),
buf,
sizeof(buf));
clockbuf = buf;
}

static void leave_state(
struct watchman_user_client* client,
struct watchman_client_state_assertion* assertion,
bool abandoned,
json_t* metadata) {
// Broadcast about the state leave
auto payload =
json_object({{"root", w_string_to_json(assertion->root->root_path)},
{"clock", typed_string_to_json(clockbuf, W_STRING_UNICODE)},
{"state-leave", w_string_to_json(assertion->name)}});
auto payload = json_object(
{{"root", w_string_to_json(assertion->root->root_path)},
{"clock",
w_string_to_json(assertion->root->inner.view->getCurrentClockString())},
{"state-leave", w_string_to_json(assertion->name)}});
if (metadata) {
payload.set("metadata", json_ref(metadata));
}
Expand Down Expand Up @@ -194,7 +177,7 @@ void w_client_vacate_states(struct watchman_user_client *client) {

// This will delete the state from client->states and invalidate
// the iterator.
leave_state(client, assertion, true, NULL, NULL);
leave_state(client, assertion, true, nullptr);
}
}

Expand All @@ -206,7 +189,6 @@ static void cmd_state_leave(
// client can delete this assertion, and this function is only executed by
// the thread that owns this client.
struct watchman_client_state_assertion *assertion = NULL;
char clockbuf[128];
auto client = dynamic_cast<watchman_user_client*>(clientbase);
json_ref response;

Expand Down Expand Up @@ -247,27 +229,18 @@ static void cmd_state_leave(
}
}

{ // FIXME: lock
// Sample the clock buf for the subscription PDUs we're going to
// send
clock_id_string(
root->inner.number,
root->inner.view->getMostRecentTickValue(),
clockbuf,
sizeof(clockbuf));
}

// We're about to successfully leave the state, this is our response to the
// state-leave command. We do this before we send the subscription
// PDUs in case CLIENT has active subscriptions for this root
response = make_response();
response.set({{"root", w_string_to_json(root->root_path)},
{"state-leave", w_string_to_json(parsed.name)},
{"clock", typed_string_to_json(clockbuf, W_STRING_UNICODE)}});
response.set(
{{"root", w_string_to_json(root->root_path)},
{"state-leave", w_string_to_json(parsed.name)},
{"clock", w_string_to_json(root->inner.view->getCurrentClockString())}});
send_and_dispose_response(client, std::move(response));

// Notify and exit the state
leave_state(client, assertion, false, parsed.metadata, clockbuf);
leave_state(client, assertion, false, parsed.metadata);
}
W_CMD_REG("state-leave", cmd_state_leave, CMD_DAEMON, w_cmd_realpath_root)

Expand Down
15 changes: 9 additions & 6 deletions cmds/subscribe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ void watchman_client_subscription::processSubscription() {
bool drop = false;
w_string policy_name;

auto position = root->inner.view->getMostRecentRootNumberAndTickValue();

watchman::log(
watchman::DBG,
"sub=",
Expand All @@ -63,10 +65,10 @@ void watchman_client_subscription::processSubscription() {
", last=",
last_sub_tick,
" pending=",
root->inner.view->getMostRecentTickValue(),
position.ticks,
"\n");

if (last_sub_tick != root->inner.view->getMostRecentTickValue()) {
if (last_sub_tick != position.ticks) {
auto asserted_states = root->asserted_states.rlock();
if (!asserted_states->empty() && !drop_or_defer.empty()) {
// There are 1 or more states asserted and this subscription
Expand Down Expand Up @@ -101,9 +103,9 @@ void watchman_client_subscription::processSubscription() {

if (drop) {
// fast-forward over any notifications while in the drop state
last_sub_tick = root->inner.view->getMostRecentTickValue();
last_sub_tick = position.ticks;
query->since_spec =
w_clockspec_new_clock(root->inner.number, last_sub_tick);
w_clockspec_new_clock(position.rootNumber, last_sub_tick);
watchman::log(
watchman::DBG,
"dropping subscription notifications for ",
Expand Down Expand Up @@ -138,7 +140,8 @@ void watchman_client_subscription::processSubscription() {

if (executeQuery) {
w_run_subscription_rules(client.get(), this, root);
last_sub_tick = root->inner.view->getMostRecentTickValue();
last_sub_tick =
position.ticks; // FIXME take this from the subscription results
}
} else {
watchman::log(watchman::DBG, "subscription ", name, " is up to date\n");
Expand Down Expand Up @@ -372,7 +375,7 @@ static void cmd_subscribe(
resp.set("subscribe", json_ref(jname));

add_root_warnings_to_response(resp, root);
annotate_with_clock(root, resp);
annotate_with_clock(root, resp); // FIXME: take from subscription query
initial_subscription_results = build_subscription_results(sub.get(), root);

send_and_dispose_response(client, std::move(resp));
Expand Down
2 changes: 1 addition & 1 deletion listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ static void client_thread(std::shared_ptr<watchman_client> client) {
"Unilateral payload for sub ",
sub->name,
" ",
dumped,
dumped ? dumped : "<<MISSING!!>>",
"\n");
free(dumped);

Expand Down
9 changes: 4 additions & 5 deletions perf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,16 @@ void watchman_perf_sample::add_root_meta(
auto meta = json_object(
{{"path", w_string_to_json(root->root_path)},
{"recrawl_count", json_integer(root->recrawlInfo.rlock()->recrawlCount)},
{"number", json_integer(root->inner.number)},
{"case_sensitive", json_boolean(root->case_sensitive)}});

// During recrawl, the view may be re-assigned. Protect against
// reading a nullptr.
auto view = root->inner.view;
if (view) {
meta.set({
{"watcher", w_string_to_json(root->inner.view->getName())},
{"ticks", json_integer(view->getMostRecentTickValue())},
});
auto position = view->getMostRecentRootNumberAndTickValue();
meta.set({{"number", json_integer(position.rootNumber)},
{"ticks", json_integer(position.ticks)},
{"watcher", w_string_to_json(view->getName())}});
}

add_meta("root", std::move(meta));
Expand Down
Loading

0 comments on commit 61ff1b0

Please sign in to comment.