Skip to content

Commit

Permalink
Add refreshing materialized views with dependencies
Browse files Browse the repository at this point in the history
We had issues with refreshing complex chains of materialized views that
depended on one another. For example, we had the following type of thing
happening.

(imagine all the relations below are materialized views)

A depends on B
B depends on C and D
C depends on D
D depends only on non-materialized views

Now if you wanted to refresh A, and you wanted it to be as fresh as possible,
first you would need to refresh the materialized views on which it depends,
followed by A itself. This became frequent enough, and changed with enough
frequency, to make keeping refresh methods for each individual materialized
view unreasonable.

I realized we could use the information kept in `pg_depend` for this purpose,
and also thought that I'm most likely not the first person to have this
problem, so I'm contribuing it back here. Also, I can see further improvements
to the migrations being built off of this usage of the dependency graph stored
in Postgres, so I thought it might help!
  • Loading branch information
devonestes committed Nov 16, 2016
1 parent a2dc1ae commit 75e12da
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 2 deletions.
2 changes: 1 addition & 1 deletion lib/generators/scenic/model/templates/model.erb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
def self.refresh
Scenic.database.refresh_materialized_view(table_name, concurrently: false)
Scenic.database.refresh_materialized_view(table_name, concurrently: false, cascade: false)
end
12 changes: 11 additions & 1 deletion lib/scenic/adapters/postgres.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require_relative "postgres/index_reapplication"
require_relative "postgres/indexes"
require_relative "postgres/views"
require_relative "postgres/refresh_dependencies"

module Scenic
# Scenic database adapters.
Expand Down Expand Up @@ -195,8 +196,9 @@ def drop_materialized_view(name)
# Scenic.database.refresh_materialized_view(:posts, concurrent: true)
#
# @return [void]
def refresh_materialized_view(name, concurrently: false)
def refresh_materialized_view(name, concurrently: false, cascade: false)
raise_unless_materialized_views_supported
refresh_dependencies_for(name) if cascade

if concurrently
raise_unless_concurrent_refresh_supported
Expand Down Expand Up @@ -226,6 +228,14 @@ def raise_unless_concurrent_refresh_supported
raise ConcurrentRefreshesNotSupportedError
end
end

def refresh_dependencies_for(name)
Scenic::Adapters::Postgres::RefreshDependencies.call(
name,
self,
connection,
)
end
end
end
end
67 changes: 67 additions & 0 deletions lib/scenic/adapters/postgres/refresh_dependencies.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
module Scenic
module Adapters
class Postgres
class RefreshDependencies
def self.call(name, adapter, connection)
new(name, adapter, connection).call
end

def initialize(name, adapter, connection)
@name = name
@adapter = adapter
@connection = connection
end

def call
dependencies.each do |dependency|
adapter.refresh_materialized_view(dependency)
end
end

private

attr_reader :name, :adapter, :connection

DEPENDENCY_SQL = <<-SQL.freeze
SELECT r_ns.nspname || '.' || cl_r.relname AS materialized_view,
array_agg(d_ns.nspname || '.' || cl_d.relname) AS depends_on
FROM pg_rewrite AS r
JOIN pg_class AS cl_r ON r.ev_class=cl_r.oid
JOIN pg_depend AS d ON r.oid=d.objid
JOIN pg_class AS cl_d ON d.refobjid=cl_d.oid
JOIN pg_namespace AS r_ns ON r_ns.oid = cl_r.relnamespace
JOIN pg_namespace AS d_ns ON d_ns.oid = cl_d.relnamespace
WHERE cl_d.relkind = 'm'
AND cl_r.relkind = 'm'
AND cl_d.relname != cl_r.relname
GROUP BY cl_r.relname, r_ns.nspname
ORDER BY cl_r.relname;
SQL

def dependencies
dependency_rows = connection.select_rows(DEPENDENCY_SQL)
dependency_hash = parse_to_hash(dependency_rows)
sorted_arr = tsort(dependency_hash)
idx = sorted_arr.find_index { |dep| dep.include?(name.to_s) }
sorted_arr[0...idx]
end

def parse_to_hash(dependency_rows)
dependency_rows.each_with_object({}) do |row, hash|
formatted_dependencies = row.last.tr("{}", "").split(",")
formatted_dependencies.each do |dependency|
hash[dependency] = [] unless hash[dependency]
end
hash[row.first] = formatted_dependencies
end
end

def tsort(hash)
each_node = lambda { |&b| hash.each_key(&b) }
each_child = lambda { |n, &b| hash[n].each(&b) }
TSort.tsort(each_node, each_child)
end
end
end
end
end
42 changes: 42 additions & 0 deletions spec/scenic/adapters/postgres/refresh_dependencies_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
require "spec_helper"

module Scenic
module Adapters
describe Postgres::RefreshDependencies, :db do
it "refreshes dependecies in the correct order" do
adapter = Postgres.new

adapter.create_materialized_view(
"first",
"SELECT text 'hi' AS greeting",
)

adapter.create_materialized_view(
"second",
"SELECT * from first",
)

adapter.create_materialized_view(
"third",
"SELECT * from first UNION SELECT * from second",
)

adapter.create_materialized_view(
"fourth",
"SELECT * from third",
)

expect(adapter).to receive(:refresh_materialized_view).
with("public.first").ordered

expect(adapter).to receive(:refresh_materialized_view).
with("public.second").ordered

expect(adapter).to receive(:refresh_materialized_view).
with("public.third").ordered

described_class.call(:fourth, adapter, ActiveRecord::Base.connection)
end
end
end
end
9 changes: 9 additions & 0 deletions spec/scenic/adapters/postgres_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ module Adapters
.to raise_error err
end

it "can refresh the views dependencies first" do
connection = double("Connection").as_null_object
connectable = double("Connectable", connection: connection)
adapter = Postgres.new(connectable)
expect(Scenic::Adapters::Postgres::RefreshDependencies).
to receive(:call).with(:tests, adapter, connection)
adapter.refresh_materialized_view(:tests, cascade: true)
end

context "refreshing concurrently" do
it "raises descriptive error if concurrent refresh is not possible" do
adapter = Postgres.new
Expand Down

0 comments on commit 75e12da

Please sign in to comment.