Skip to content

Commit

Permalink
Add CLI modules for PostgreSQL and TPC-H. (cmu-db#15)
Browse files Browse the repository at this point in the history
This PR adds functionality for setting up the DBMS and obtaining
benchmark data for Proto-X.

Some highlighted commands:

- `benchmark tpch`
  - `generate-sf SF`: generate table data for the given scale factor.
- `generate-workload WORKLOAD_NAME SEED_START SEED_STOP`: generate a
named workload that has TPC-H queries seeded with [SEED_START,
SEED_STOP] inclusive. Accepts options to control which queries are
included.
- `load-sf SF DBMS DBNAME`: loads the specified table data (and creates
indexes) into the relevant database.
- `dbms postgres`
- `clone`: obtain the source code for our build of DBMS, also installs
extensions
- `init-auth`: create default authentication from the root `config.yaml`
specifications
- `init-db DBNAME`: create a new empty database with the specified name
  - `init-pgdata`: (re-)initialize the PGDATA folder
  - `run-sql-file SQL_PATH`: run the specified SQL file
  - `start`: (re-)start the DBMS
  - `stop`: stop the DBMS

The file `./test/wan_test.sh` demonstrates a complete run that includes
(1) setting up the DBMS, (2) setting up TPC-H, (3) obtaining training
data for Proto-X embeddings, (4) selecting the best Proto-X embedding.
  • Loading branch information
lmwnshn authored Mar 11, 2024
1 parent 489c7f3 commit df6a166
Show file tree
Hide file tree
Showing 30 changed files with 883 additions and 65 deletions.
4 changes: 4 additions & 0 deletions benchmark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from benchmark.cli import benchmark_group
from benchmark.tpch.cli import tpch_group

benchmark_group.add_command(tpch_group)
9 changes: 9 additions & 0 deletions benchmark/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import click

from misc.utils import DBGymConfig


@click.group(name="benchmark")
@click.pass_obj
def benchmark_group(config: DBGymConfig):
config.append_group("benchmark")
Empty file added benchmark/tpch/__init__.py
Empty file.
217 changes: 217 additions & 0 deletions benchmark/tpch/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import logging
from pathlib import Path

import click
from sqlalchemy import create_engine

from misc.utils import DBGymConfig
from util.shell import subprocess_run
from util.sql import *

benchmark_tpch_logger = logging.getLogger("benchmark/tpch")
benchmark_tpch_logger.setLevel(logging.INFO)


@click.group(name="tpch")
@click.pass_obj
def tpch_group(config: DBGymConfig):
config.append_group("tpch")


@tpch_group.command(name="generate-sf")
@click.argument("sf", type=int)
@click.pass_obj
def tpch_generate_sf(config: DBGymConfig, sf: int):
clone(config)
generate_tables(config, sf)


@tpch_group.command(name="generate-workload")
@click.argument("workload-name", type=str)
@click.argument("seed-start", type=int)
@click.argument("seed-end", type=int)
@click.option(
"--generate_type",
type=click.Choice(["sequential", "even", "odd"]),
default="sequential",
)
@click.pass_obj
def tpch_generate_workload(
config: DBGymConfig,
workload_name: str,
seed_start: int,
seed_end: int,
generate_type: str,
):
clone(config)
generate_queries(config, seed_start, seed_end)
generate_workload(config, workload_name, seed_start, seed_end, generate_type)


@tpch_group.command(name="load-sf")
@click.argument("sf", type=int)
@click.argument("dbms", type=str)
@click.argument("dbname", type=str)
@click.pass_obj
def tpch_load_tables(config: DBGymConfig, sf: int, dbms: str, dbname: str):
clone(config)
generate_tables(config, sf)
load_tables(config, sf, dbms, dbname)


def clone(config: DBGymConfig):
symlink_dir = config.cur_symlinks_build_path("tpch-kit")
if symlink_dir.exists():
benchmark_tpch_logger.info(f"Skipping clone: {symlink_dir}")
return

benchmark_tpch_logger.info(f"Cloning: {symlink_dir}")
real_build_path = config.cur_task_runs_build_path()
subprocess_run(f"./tpch_setup.sh {real_build_path}", cwd=config.cur_source_path())
subprocess_run(
f"ln -s {real_build_path / 'tpch-kit'} {config.cur_symlinks_build_path(mkdir=True)}"
)
benchmark_tpch_logger.info(f"Cloned: {symlink_dir}")


def generate_queries(config, seed_start, seed_end):
build_path = config.cur_symlinks_build_path()
assert build_path.exists()

data_path = config.cur_symlinks_data_path(mkdir=True)
benchmark_tpch_logger.info(
f"Generating queries: {data_path} [{seed_start}, {seed_end}]"
)
for seed in range(seed_start, seed_end + 1):
symlinked_seed = data_path / f"queries_{seed}"
if symlinked_seed.exists():
continue

real_dir = config.cur_task_runs_data_path(f"queries_{seed}", mkdir=True)
for i in range(1, 22 + 1):
target_sql = (real_dir / f"{i}.sql").resolve()
subprocess_run(
f"DSS_QUERY=./queries ./qgen {i} -r {seed} > {target_sql}",
cwd=build_path / "tpch-kit" / "dbgen",
verbose=False,
)
subprocess_run(f"ln -s {real_dir} {data_path}", verbose=False)
benchmark_tpch_logger.info(
f"Generated queries: {data_path} [{seed_start}, {seed_end}]"
)


def generate_tables(config: DBGymConfig, sf: int):
build_path = config.cur_symlinks_build_path()
assert build_path.exists()

data_path = config.cur_symlinks_data_path(mkdir=True)
symlink_dir = data_path / f"tables_sf{sf}"
if symlink_dir.exists():
benchmark_tpch_logger.info(f"Skipping generation: {symlink_dir}")
return

benchmark_tpch_logger.info(f"Generating: {symlink_dir}")
subprocess_run(f"./dbgen -vf -s {sf}", cwd=build_path / "tpch-kit" / "dbgen")
real_dir = config.cur_task_runs_data_path(f"tables_sf{sf}", mkdir=True)
subprocess_run(f"mv ./*.tbl {real_dir}", cwd=build_path / "tpch-kit" / "dbgen")

subprocess_run(f"ln -s {real_dir} {data_path}")
benchmark_tpch_logger.info(f"Generated: {symlink_dir}")


def generate_workload(
config: DBGymConfig,
workload_name: str,
seed_start: int,
seed_end: int,
generate_type: str,
):
data_path = config.cur_symlinks_data_path(mkdir=True)
workload_path = data_path / f"workload_{workload_name}"
if workload_path.exists():
benchmark_tpch_logger.error(f"Workload directory exists: {workload_path}")
raise RuntimeError(f"Workload directory exists: {workload_path}")

benchmark_tpch_logger.info(f"Generating: {workload_path}")
real_dir = config.cur_task_runs_data_path(f"workload_{workload_path}", mkdir=True)

queries = None
if generate_type == "sequential":
queries = [f"{i}" for i in range(1, 22 + 1)]
elif generate_type == "even":
queries = [f"{i}" for i in range(1, 22 + 1) if i % 2 == 0]
elif generate_type == "odd":
queries = [f"{i}" for i in range(1, 22 + 1) if i % 2 == 1]

with open(real_dir / "order.txt", "w") as f:
for seed in range(seed_start, seed_end + 1):
for qnum in queries:
sqlfile = data_path / f"queries_{seed}" / f"{qnum}.sql"
assert sqlfile.exists()
output = ",".join([f"S{seed}-Q{qnum}", str(sqlfile)])
print(output, file=f)
# TODO(WAN): add option to deep-copy the workload.
subprocess_run(f"ln -s {real_dir} {data_path}")
benchmark_tpch_logger.info(f"Generated: {workload_path}")


def _loaded(conn: Connection):
# l_sk_pk is the last index that we create.
sql = "SELECT * FROM pg_indexes WHERE indexname = 'l_sk_pk'"
res = conn_execute(conn, sql).fetchall()
return len(res) > 0


def _load(config: DBGymConfig, conn: Connection, sf: int):
schema_root = config.cur_source_path()
data_root = config.cur_symlinks_data_path()

tables = [
"region",
"nation",
"part",
"supplier",
"partsupp",
"customer",
"orders",
"lineitem",
]

sql_file_execute(conn, schema_root / "tpch_schema.sql")
for table in tables:
conn_execute(conn, f"TRUNCATE {table} CASCADE")
for table in tables:
table_path = data_root / f"tables_sf{sf}" / f"{table}.tbl"

with open(table_path, "r") as table_csv:
with conn.connection.dbapi_connection.cursor() as cur:
with cur.copy(f"COPY {table} FROM STDIN CSV DELIMITER '|'") as copy:
while data := table_csv.read(8192):
copy.write(data)
sql_file_execute(conn, schema_root / "tpch_constraints.sql")


def load_tables(config: DBGymConfig, sf: int, dbms: str, dbname: str):
# TODO(WAN): repetition and slight break of abstraction here.
dbms_yaml = config.root_yaml["dbms"][dbms]
dbms_user = dbms_yaml["user"]
dbms_pass = dbms_yaml["pass"]
dbms_port = dbms_yaml["port"]

if dbms == "postgres":
connstr = f"postgresql+psycopg://{dbms_user}:{dbms_pass}@localhost:{dbms_port}/{dbname}"
else:
raise RuntimeError(f"Unknown DBMS: {dbms}")

engine: Engine = create_engine(
connstr,
execution_options={"isolation_level": "AUTOCOMMIT"},
)
with engine.connect() as conn:
if _loaded(conn):
benchmark_tpch_logger.info(f"Skipping load: TPC-H SF {sf}")
else:
benchmark_tpch_logger.info(f"Loading: TPC-H SF {sf}")
_load(config, conn, sf)
benchmark_tpch_logger.info(f"Loaded: TPC-H SF {sf}")
32 changes: 32 additions & 0 deletions benchmark/tpch/tpch_constraints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
ALTER TABLE nation ADD CONSTRAINT nation_n_regionkey_fkey FOREIGN KEY (n_regionkey) REFERENCES region (r_regionkey) ON DELETE CASCADE;
ALTER TABLE supplier ADD CONSTRAINT supplier_s_nationkey_fkey FOREIGN KEY (s_nationkey) REFERENCES nation (n_nationkey) ON DELETE CASCADE;
ALTER TABLE partsupp ADD CONSTRAINT partsupp_ps_partkey_fkey FOREIGN KEY (ps_partkey) REFERENCES part (p_partkey) ON DELETE CASCADE;
ALTER TABLE partsupp ADD CONSTRAINT partsupp_ps_suppkey_fkey FOREIGN KEY (ps_suppkey) REFERENCES supplier (s_suppkey) ON DELETE CASCADE;
ALTER TABLE customer ADD CONSTRAINT customer_c_nationkey_fkey FOREIGN KEY (c_nationkey) REFERENCES nation (n_nationkey) ON DELETE CASCADE;
ALTER TABLE orders ADD CONSTRAINT orders_o_custkey_fkey FOREIGN KEY (o_custkey) REFERENCES customer (c_custkey) ON DELETE CASCADE;
ALTER TABLE lineitem ADD CONSTRAINT lineitem_l_orderkey_fkey FOREIGN KEY (l_orderkey) REFERENCES orders (o_orderkey) ON DELETE CASCADE;
ALTER TABLE lineitem ADD CONSTRAINT lineitem_l_partkey_l_suppkey_fkey FOREIGN KEY (l_partkey, l_suppkey) REFERENCES partsupp (ps_partkey, ps_suppkey) ON DELETE CASCADE;

CREATE UNIQUE INDEX r_rk ON region (r_regionkey ASC);
CREATE UNIQUE INDEX n_nk ON nation (n_nationkey ASC);
CREATE INDEX n_rk ON nation (n_regionkey ASC);
CREATE UNIQUE INDEX p_pk ON part (p_partkey ASC);
CREATE UNIQUE INDEX s_sk ON supplier (s_suppkey ASC);
CREATE INDEX s_nk ON supplier (s_nationkey ASC);
CREATE INDEX ps_pk ON partsupp (ps_partkey ASC);
CREATE INDEX ps_sk ON partsupp (ps_suppkey ASC);
CREATE UNIQUE INDEX ps_pk_sk ON partsupp (ps_partkey ASC, ps_suppkey ASC);
CREATE UNIQUE INDEX ps_sk_pk ON partsupp (ps_suppkey ASC, ps_partkey ASC);
CREATE UNIQUE INDEX c_ck ON customer (c_custkey ASC);
CREATE INDEX c_nk ON customer (c_nationkey ASC);
CREATE UNIQUE INDEX o_ok ON orders (o_orderkey ASC);
CREATE INDEX o_ck ON orders (o_custkey ASC);
CREATE INDEX o_od ON orders (o_orderdate ASC);
CREATE INDEX l_ok ON lineitem (l_orderkey ASC);
CREATE INDEX l_pk ON lineitem (l_partkey ASC);
CREATE INDEX l_sk ON lineitem (l_suppkey ASC);
CREATE INDEX l_sd ON lineitem (l_shipdate ASC);
CREATE INDEX l_cd ON lineitem (l_commitdate ASC);
CREATE INDEX l_rd ON lineitem (l_receiptdate ASC);
CREATE INDEX l_pk_sk ON lineitem (l_partkey ASC, l_suppkey ASC);
CREATE INDEX l_sk_pk ON lineitem (l_suppkey ASC, l_partkey ASC);
16 changes: 16 additions & 0 deletions benchmark/tpch/tpch_queries.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash

set -euxo pipefail

cd "${TPCH_REPO_ROOT}/dbgen"
set +x
for seed in $(seq "${TPCH_QUERY_START}" "${TPCH_QUERY_STOP}"); do
if [ ! -d "${TPCH_QUERY_ROOT}/${seed}" ]; then
mkdir -p "${TPCH_QUERY_ROOT}/${seed}"
for qnum in {1..22}; do
DSS_QUERY="./queries" ./qgen "${qnum}" -r "${seed}" > "${TPCH_QUERY_ROOT}/${seed}/${qnum}.sql"
done
fi
done
set -x
cd "${ROOT_DIR}"
104 changes: 104 additions & 0 deletions benchmark/tpch/tpch_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
-- Copied over from https://github.com/cmu-db/benchbase/blob/main/src/main/resources/benchmarks/tpch/ddl-postgres.sql
-- as of bfa3184e7a09c0b9a57beff38cc5b6794fab53c3 , FKs and indexes in a separate file.

DROP TABLE IF EXISTS nation CASCADE;
DROP TABLE IF EXISTS region CASCADE;
DROP TABLE IF EXISTS part CASCADE;
DROP TABLE IF EXISTS supplier CASCADE;
DROP TABLE IF EXISTS partsupp CASCADE;
DROP TABLE IF EXISTS orders CASCADE;
DROP TABLE IF EXISTS customer CASCADE;
DROP TABLE IF EXISTS lineitem CASCADE;

CREATE TABLE region (
r_regionkey integer NOT NULL,
r_name char(25) NOT NULL,
r_comment varchar(152),
PRIMARY KEY (r_regionkey)
);

CREATE TABLE nation (
n_nationkey integer NOT NULL,
n_name char(25) NOT NULL,
n_regionkey integer NOT NULL,
n_comment varchar(152),
PRIMARY KEY (n_nationkey)
);

CREATE TABLE part (
p_partkey integer NOT NULL,
p_name varchar(55) NOT NULL,
p_mfgr char(25) NOT NULL,
p_brand char(10) NOT NULL,
p_type varchar(25) NOT NULL,
p_size integer NOT NULL,
p_container char(10) NOT NULL,
p_retailprice decimal(15, 2) NOT NULL,
p_comment varchar(23) NOT NULL,
PRIMARY KEY (p_partkey)
);

CREATE TABLE supplier (
s_suppkey integer NOT NULL,
s_name char(25) NOT NULL,
s_address varchar(40) NOT NULL,
s_nationkey integer NOT NULL,
s_phone char(15) NOT NULL,
s_acctbal decimal(15, 2) NOT NULL,
s_comment varchar(101) NOT NULL,
PRIMARY KEY (s_suppkey)
);

CREATE TABLE partsupp (
ps_partkey integer NOT NULL,
ps_suppkey integer NOT NULL,
ps_availqty integer NOT NULL,
ps_supplycost decimal(15, 2) NOT NULL,
ps_comment varchar(199) NOT NULL,
PRIMARY KEY (ps_partkey, ps_suppkey)
);

CREATE TABLE customer (
c_custkey integer NOT NULL,
c_name varchar(25) NOT NULL,
c_address varchar(40) NOT NULL,
c_nationkey integer NOT NULL,
c_phone char(15) NOT NULL,
c_acctbal decimal(15, 2) NOT NULL,
c_mktsegment char(10) NOT NULL,
c_comment varchar(117) NOT NULL,
PRIMARY KEY (c_custkey)
);

CREATE TABLE orders (
o_orderkey integer NOT NULL,
o_custkey integer NOT NULL,
o_orderstatus char(1) NOT NULL,
o_totalprice decimal(15, 2) NOT NULL,
o_orderdate date NOT NULL,
o_orderpriority char(15) NOT NULL,
o_clerk char(15) NOT NULL,
o_shippriority integer NOT NULL,
o_comment varchar(79) NOT NULL,
PRIMARY KEY (o_orderkey)
);

CREATE TABLE lineitem (
l_orderkey integer NOT NULL,
l_partkey integer NOT NULL,
l_suppkey integer NOT NULL,
l_linenumber integer NOT NULL,
l_quantity decimal(15, 2) NOT NULL,
l_extendedprice decimal(15, 2) NOT NULL,
l_discount decimal(15, 2) NOT NULL,
l_tax decimal(15, 2) NOT NULL,
l_returnflag char(1) NOT NULL,
l_linestatus char(1) NOT NULL,
l_shipdate date NOT NULL,
l_commitdate date NOT NULL,
l_receiptdate date NOT NULL,
l_shipinstruct char(25) NOT NULL,
l_shipmode char(10) NOT NULL,
l_comment varchar(44) NOT NULL,
PRIMARY KEY (l_orderkey, l_linenumber)
);
Loading

0 comments on commit df6a166

Please sign in to comment.