Skip to content

Commit

Permalink
Reconfiguration demo (cmu-db#47)
Browse files Browse the repository at this point in the history
**Summary**: Created basic demo for reconfiguring Postgres, also adding
tests for reconfiguration.

**Demo**:


https://github.com/user-attachments/assets/2e9bc151-0d8e-41e4-a5d8-33860fe21f27


**Details**:
* Refactored `restart_with_changes()` to take in an `Optional[dict[str,
str]]` instead of an `Optional[list[tuple[str, str]]]`, so that we can
ensure that no knobs appear multiple times.
* Added unit tests for `restart_with_changes()`, including edge cases
around calling it multiple times and whether it modifies its input
params.
* Figured out difference between `@st.cache_resource` and
`st.session_state` (with detailed comments in `tune/demo/main.py`). We
need the former to prevent `DBGymConfig` from being created multiple
times and the latter for persisting configuration changes across
`st.rerun()` calls.
  • Loading branch information
wangpatrick57 authored Nov 14, 2024
1 parent 4f9e0e6 commit c1f162d
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
# Integration tests do require external systems to be running (most commonly a database instance).
# Unlike end-to-end tests though, they test a specific module in a detailed manner, much like a unit test does.
env:
# We set `INTENDED_DBDATA_HARDWARE` so that it's seen when `integtest_pg_conn.py` executes `./tune/env/set_up_env_integtests.sh`.
# We set `INTENDED_DBDATA_HARDWARE` so that it's seen when `integtest_pg_conn.py` executes `./env/set_up_env_integtests.sh`.
INTENDED_DBDATA_HARDWARE: ssd
run: |
. "$HOME/.cargo/env"
Expand Down
Empty file added env/__init__.py
Empty file.
File renamed without changes.
74 changes: 66 additions & 8 deletions tune/env/integtest_pg_conn.py → env/integtest_pg_conn.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import copy
import subprocess
import unittest
from pathlib import Path

import yaml

from tune.env.pg_conn import PostgresConn
from env.pg_conn import PostgresConn
from util.pg import (
DEFAULT_POSTGRES_PORT,
get_is_postgres_running,
Expand All @@ -18,7 +19,7 @@
default_pristine_dbdata_snapshot_path,
)

ENV_INTEGTESTS_DBGYM_CONFIG_FPATH = Path("tune/env/env_integtests_dbgym_config.yaml")
ENV_INTEGTESTS_DBGYM_CONFIG_FPATH = Path("env/env_integtests_dbgym_config.yaml")
BENCHMARK = "tpch"
SCALE_FACTOR = 0.01

Expand All @@ -36,14 +37,14 @@ class PostgresConnTests(unittest.TestCase):
def setUpClass() -> None:
# If you're running the test locally, this check makes runs past the first one much faster.
if not get_unittest_workspace_path().exists():
subprocess.run(["./tune/env/set_up_env_integtests.sh"], check=True)
subprocess.run(["./env/set_up_env_integtests.sh"], check=True)

PostgresConnTests.dbgym_cfg = DBGymConfig(ENV_INTEGTESTS_DBGYM_CONFIG_FPATH)

def setUp(self) -> None:
self.assertFalse(
get_is_postgres_running(),
"Make sure Postgres isn't running before starting the integration test. `pkill postgres` is one way"
"Make sure Postgres isn't running before starting the integration test. `pkill postgres` is one way "
+ "to ensure this. Be careful about accidentally taking down other people's Postgres instances though.",
)
self.pristine_dbdata_snapshot_path = default_pristine_dbdata_snapshot_path(
Expand Down Expand Up @@ -74,18 +75,18 @@ def test_init(self) -> None:
def test_start_and_stop(self) -> None:
pg_conn = self.create_pg_conn()
pg_conn.restore_pristine_snapshot()
pg_conn.start_with_changes()
pg_conn.restart_postgres()
self.assertTrue(get_is_postgres_running())
pg_conn.shutdown_postgres()

def test_start_on_multiple_ports(self) -> None:
pg_conn0 = self.create_pg_conn()
pg_conn0.restore_pristine_snapshot()
pg_conn0.start_with_changes()
pg_conn0.restart_postgres()
self.assertEqual(set(get_running_postgres_ports()), {DEFAULT_POSTGRES_PORT})
pg_conn1 = self.create_pg_conn(DEFAULT_POSTGRES_PORT + 1)
pg_conn1.restore_pristine_snapshot()
pg_conn1.start_with_changes()
pg_conn1.restart_postgres()
self.assertEqual(
set(get_running_postgres_ports()),
{DEFAULT_POSTGRES_PORT, DEFAULT_POSTGRES_PORT + 1},
Expand All @@ -99,7 +100,7 @@ def test_connect_and_disconnect(self) -> None:
# Setup
pg_conn = self.create_pg_conn()
pg_conn.restore_pristine_snapshot()
pg_conn.start_with_changes()
pg_conn.restart_postgres()

# Test
self.assertIsNone(pg_conn._conn)
Expand All @@ -115,6 +116,63 @@ def test_connect_and_disconnect(self) -> None:
# Cleanup
pg_conn.shutdown_postgres()

def test_start_with_changes(self) -> None:
# Setup
pg_conn = self.create_pg_conn()
pg_conn.restore_pristine_snapshot()
pg_conn.restart_postgres()

# Test
initial_sysknobs = pg_conn.get_system_knobs()
self.assertEqual(initial_sysknobs["wal_buffers"], "4MB")
pg_conn.restart_with_changes({"wal_buffers": "8MB"})
new_sysknobs = pg_conn.get_system_knobs()
self.assertEqual(new_sysknobs["wal_buffers"], "8MB")

# Cleanup
pg_conn.shutdown_postgres()

def test_multiple_start_with_changes(self) -> None:
# Setup
pg_conn = self.create_pg_conn()
pg_conn.restore_pristine_snapshot()
pg_conn.restart_postgres()

# Test
initial_sysknobs = pg_conn.get_system_knobs()

# First call
self.assertEqual(initial_sysknobs["wal_buffers"], "4MB")
pg_conn.restart_with_changes({"wal_buffers": "8MB"})
new_sysknobs = pg_conn.get_system_knobs()
self.assertEqual(new_sysknobs["wal_buffers"], "8MB")

# Second call
self.assertEqual(initial_sysknobs["enable_nestloop"], "on")
pg_conn.restart_with_changes({"enable_nestloop": "off"})
new_sysknobs = pg_conn.get_system_knobs()
self.assertEqual(new_sysknobs["enable_nestloop"], "off")
# The changes should not be additive. The "wal_buffers" should have "reset" to 4MB.
self.assertEqual(new_sysknobs["wal_buffers"], "4MB")

# Cleanup
pg_conn.shutdown_postgres()

def test_start_with_changes_doesnt_modify_input(self) -> None:
# Setup
pg_conn = self.create_pg_conn()
pg_conn.restore_pristine_snapshot()
pg_conn.restart_postgres()

# Test
conf_changes = {"wal_buffers": "8MB"}
orig_conf_changes = copy.deepcopy(conf_changes)
pg_conn.restart_with_changes(conf_changes)
self.assertEqual(conf_changes, orig_conf_changes)

# Cleanup
pg_conn.shutdown_postgres()


if __name__ == "__main__":
unittest.main()
56 changes: 45 additions & 11 deletions tune/env/pg_conn.py → env/pg_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,27 +129,40 @@ def shutdown_postgres(self) -> None:
if not exists and retcode != 0:
break

def start_with_changes(
def restart_postgres(self) -> bool:
# TODO: check if we still get the shared preload libraries correctly if we do None
return self.restart_with_changes(conf_changes=None)

def restart_with_changes(
self,
conf_changes: Optional[list[str]] = None,
conf_changes: Optional[dict[str, str]],
dump_page_cache: bool = False,
save_checkpoint: bool = False,
) -> bool:
"""
This function is called "(re)start" because it also shuts down Postgres before starting it.
This function assumes that some snapshot has already been untarred into self.dbdata_dpath.
You can do this by calling one of the wrappers around _restore_snapshot().
Note that multiple calls are not "additive". Calling this will restart from the latest saved
snapshot. If you want it to be additive without the overhead of saving a snapshot, pass in
multiple changes to `conf_changes`.
"""
# Install the new configuration changes.
if conf_changes is not None:
if SHARED_PRELOAD_LIBRARIES:
# This way of doing it works for both single or multiple libraries. An example of a way
# that *doesn't* work is `f"shared_preload_libraries='"{SHARED_PRELOAD_LIBRARIES}"'"`
conf_changes.append(
f"shared_preload_libraries='{SHARED_PRELOAD_LIBRARIES}'"
)
dbdata_auto_conf_path = self.dbdata_dpath / "postgresql.auto.conf"
with open(dbdata_auto_conf_path, "w") as f:
f.write("\n".join(conf_changes))
f.write(
"\n".join([f"{knob} = {val}" for knob, val in conf_changes.items()])
+ "\n"
)

assert (
"shared_preload_libraries" not in conf_changes
), f"You should not set shared_preload_libraries manually."

# Using single quotes around SHARED_PRELOAD_LIBRARIES works for both single or multiple libraries.
f.write(f"shared_preload_libraries = '{SHARED_PRELOAD_LIBRARIES}'")

# Start postgres instance.
self.shutdown_postgres()
Expand Down Expand Up @@ -295,7 +308,16 @@ def _set_up_boot(
logging.getLogger(DBGYM_LOGGER_NAME).debug("Set up boot")

def psql(self, sql: str) -> tuple[int, Optional[str]]:
low_sql = sql.lower()
"""
Execute a SQL command (equivalent to psql -C "[cmd]") and return a status code and its stderr.
This is meant for commands that modify the database, not those that get information from the database, which
is why it doesn't return a Cursor with the result. I designed it this way because it's difficult to provide
a general-purpose API which returns results for arbitrary SQL queries as those results could be very large.
A return code of 0 means success while a non-zero return code means failure. The stderr will be None if success
and a string if failure.
"""

def cancel_fn(conn_str: str) -> None:
with psycopg.connect(
Expand Down Expand Up @@ -350,6 +372,18 @@ def cancel_fn(conn_str: str) -> None:
self.disconnect()
return 0, None

def get_system_knobs(self) -> dict[str, str]:
"""
System knobs are those applied across the entire system. They do not include table-specific
knobs, query-specific knobs (aka query hints), or indexes.
"""
conn = self.conn()
result = conn.execute("SHOW ALL").fetchall()
knobs = {}
for row in result:
knobs[row[0]] = row[1]
return knobs

def restore_pristine_snapshot(self) -> bool:
return self._restore_snapshot(self.pristine_dbdata_snapshot_fpath)

Expand Down Expand Up @@ -381,4 +415,4 @@ def _restore_snapshot(
>> f"{self.dbdata_dpath}/postgresql.conf"
)()

return self.start_with_changes(conf_changes=None)
return self.restart_postgres()
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ set -euxo pipefail
INTENDED_DBDATA_HARDWARE="${INTENDED_DBDATA_HARDWARE:-hdd}"
BENCHMARK=tpch
SCALE_FACTOR=0.01
export DBGYM_CONFIG_PATH=tune/env/env_integtests_dbgym_config.yaml # Note that this envvar needs to be exported.
export DBGYM_CONFIG_PATH=env/env_integtests_dbgym_config.yaml # Note that this envvar needs to be exported.
WORKSPACE_PATH=$(grep 'dbgym_workspace_path:' $DBGYM_CONFIG_PATH | sed 's/dbgym_workspace_path: //')

python3 task.py benchmark $BENCHMARK data $SCALE_FACTOR
Expand Down
2 changes: 2 additions & 0 deletions scripts/run_demo.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
#!/bin/bash
# You may need to do `pkill python` to fully restart the streamlit server. If you do not do this, objects cached
# with @st.cache_resource may still be persisted even after you do Ctrl-C and rerun ./scripts/run_demo.sh.
python -m streamlit run tune/demo/main.py
65 changes: 58 additions & 7 deletions tune/demo/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import streamlit as st

from tune.env.pg_conn import PostgresConn
from env.pg_conn import PostgresConn
from util.pg import DEFAULT_POSTGRES_PORT, get_is_postgres_running
from util.workspace import (
DEFAULT_BOOT_CONFIG_FPATH,
Expand All @@ -12,9 +12,28 @@
)


# This ensures that DBGymConfig is only created once. Check DBGymConfig.__init__() for why we must do this.
# The rationale behind this code is very subtle. I'll first go over streamlit concepts before describing why this function exists.
#
# First, in streamlit, there are three kinds of "script reruns". These are ordered from least to most "disruptive":
# 1. st.rerun(). Will reset any local variables but will not reset st.session_state.
# 2. Reloading the browser page (perhaps if you changed some code). Will reset local vars and st.session_state but not things
# cached with @st.cache_resource.
# 3. Restarting the streamlit server. If you're running the server locally, you can restart it by doing Ctrl-C, `pkill python`,
# and then `streamlit run ...` (or `./scripts/run_demo.sh`). Will reset local vars, st.session_state, and things cached with
# @st.cache_resource, but will not reset things persisted to disk (though we currently don't persist anything to disk). Doing
# `pkill python` is critical here to actually reset the things cached with @st.cache_resource.
#
# Next, DBGymConfig has a safeguard where it can only be created once per instance of the Python interpreter. If you just put it
# in st.session_state, it would get re-created when you reloaded the browser page, causing it to trip the assertion that checks
# DBGymConfig.num_times_created_this_run == 1. Thus, we use @st.cache_resource to avoid this.
#
# I considered modifying num_times_created_this_run to instead be num_active_instances and doing `num_active_instances -= 1` in
# DBGymConfig.__del__(). However, streamlit doesn't actually destroy objects when you reload the browser page; it only destroys
# objects when you restart the streamlit server.
#
# If you modify the code of DBGymConfig, you will need to fully restart the streamlit server for those changes to be propagated.
@st.cache_resource
def make_dbgym_cfg() -> DBGymConfig:
def make_dbgym_cfg_cached() -> DBGymConfig:
return make_standard_dbgym_cfg()


Expand All @@ -23,7 +42,7 @@ class Demo:
SCALE_FACTOR = 0.01

def __init__(self) -> None:
self.dbgym_cfg = make_dbgym_cfg()
self.dbgym_cfg = make_dbgym_cfg_cached()
self.pristine_dbdata_snapshot_path = default_pristine_dbdata_snapshot_path(
self.dbgym_cfg.dbgym_workspace_path, Demo.BENCHMARK, Demo.SCALE_FACTOR
)
Expand All @@ -41,21 +60,53 @@ def __init__(self) -> None:
DEFAULT_BOOT_CONFIG_FPATH,
)

def _get_categorized_system_knobs(self) -> tuple[dict[str, str], dict[str, str]]:
IMPORTANT_KNOBS = {"shared_buffers", "enable_nestloop"}
all_knobs = self.pg_conn.get_system_knobs()
important_knobs = {
knob: val for knob, val in all_knobs.items() if knob in IMPORTANT_KNOBS
}
unimportant_knobs = {
knob: val for knob, val in all_knobs.items() if knob not in IMPORTANT_KNOBS
}
return important_knobs, unimportant_knobs

def main(self) -> None:
is_postgres_running = get_is_postgres_running()

if is_postgres_running:
st.write("Postgres is running")
st.write("Postgres is RUNNING")

if st.button("Stop Postgres"):
self.pg_conn.shutdown_postgres()
st.rerun()

with st.form("reconfig", clear_on_submit=True, enter_to_submit=False):
knob = st.text_input("Knob", placeholder="Enter text here...")
val = st.text_input("Value", placeholder="Enter text here...")
submit_button = st.form_submit_button("Reconfigure")
if submit_button:
if knob != "" and val != "":
if "conf_changes" not in st.session_state:
st.session_state.conf_changes = dict()

# By using st.session_state, we persist changes across st.rerun() (though not across reloading the browser).
st.session_state.conf_changes[knob] = val
self.pg_conn.restart_with_changes(st.session_state.conf_changes)
st.rerun()

important_knobs, unimportant_knobs = self._get_categorized_system_knobs()
with st.expander("Important knobs", expanded=True):
st.write(important_knobs)

with st.expander("Other knobs", expanded=False):
st.write(unimportant_knobs)
else:
st.write("Postgres is not running")
st.write("Postgres is STOPPED")

if st.button("Start Postgres"):
self.pg_conn.restore_pristine_snapshot()
self.pg_conn.start_with_changes()
self.pg_conn.restart_postgres()
st.rerun()


Expand Down
4 changes: 0 additions & 4 deletions tune/env/__init__.py

This file was deleted.

4 changes: 2 additions & 2 deletions tune/protox/agent/build_trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from torch import nn
from torch.optim import Adam # type: ignore[attr-defined]

from tune.env.pg_conn import PostgresConn
from env.pg_conn import PostgresConn
from tune.protox.agent.agent_env import AgentEnv
from tune.protox.agent.buffers import ReplayBuffer
from tune.protox.agent.noise import ClampNoise
Expand Down Expand Up @@ -163,7 +163,7 @@ def _build_utilities(
artifact_manager=artifact_manager,
)

# If we're using Boot, PostgresConn.start_with_changes() assumes that Redis is running. Thus,
# If we're using Boot, PostgresConn.restart_postgres() assumes that Redis is running. Thus,
# we start Redis here if necessary.
enable_boot = hpo_params["enable_boot"][str(tuning_mode)]
if enable_boot:
Expand Down
Loading

0 comments on commit c1f162d

Please sign in to comment.