Skip to content

Commit

Permalink
Implements temporal-upserts.
Browse files Browse the repository at this point in the history
  • Loading branch information
grischoun committed Jul 29, 2020
1 parent 6a3fbdc commit 501aa72
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 76 deletions.
38 changes: 10 additions & 28 deletions src/datahike/db.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -1165,42 +1165,21 @@
(validate-datom-upsert db datom)
(let [indexing? (indexing? db (.-a datom))
schema? (ds/schema-attr? (.-a datom))
keep-history? (and (-keep-history? db) (not (no-history? db (.-a datom))))
db (if (or keep-history? indexing?)
(if-some [old ^Datom (first (-search db [(.-e datom) (.-a datom)]))]
(let [[e a v t _] old
^Datom old-retracted (dd/datom e a v
(if keep-history? t (datom-tx datom))
false)]
(cond-> db
indexing? (update-in [:avet] #(di/-remove % old :avet))
;; keep-history? (update-in [:temporal-eavt] #(di/-insert % old :eavt))
;; keep-history? (update-in [:temporal-eavt] #(di/-insert % old-retracted :eavt))

;; keep-history? (update-in [:temporal-aevt] #(di/-insert % old :aevt))
;; keep-history? (update-in [:temporal-aevt] #(di/-insert % old-retracted :aevt))


;; (and keep-history? indexing?) (update-in [:temporal-avet] #(di/-insert % old :avet))
;; (and keep-history? indexing?) (update-in [:temporal-avet] #(di/-insert % old-retracted :avet))
))
db)
db)]

keep-history? (and (-keep-history? db) (not (no-history? db (.-a datom))))]
(cond-> db
;; Optimistic remove of the schema entry
schema? (try
(-> db (remove-schema datom) update-rschema)
(catch clojure.lang.ExceptionInfo e
db))

keep-history? (update-in [:temporal-eavt] #(di/-upsert % datom true :eavt))
true (update-in [:eavt] #(di/-upsert % datom false :eavt))
keep-history? (update-in [:temporal-eavt] #(di/-temporal-upsert % datom :eavt))
true (update-in [:eavt] #(di/-upsert % datom :eavt))

keep-history? (update-in [:temporal-aevt] #(di/-upsert % datom true :aevt))
true (update-in [:aevt] #(di/-upsert % datom false :aevt))
keep-history? (update-in [:temporal-aevt] #(di/-temporal-upsert % datom :aevt))
true (update-in [:aevt] #(di/-upsert % datom :aevt))

(and keep-history? indexing?) (update-in [:temporal-avet] #(di/-upsert % datom true :avet))
(and keep-history? indexing?) (update-in [:temporal-avet] #(di/-temporal-upsert % datom :avet))
indexing? (update-in [:avet] #(di/-insert % datom :avet))

true (advance-max-eid (.-e datom))
Expand Down Expand Up @@ -1590,7 +1569,10 @@
(= op :db.history.purge/before)
(if (-keep-history? db)
(let [history (HistoricalDB. db)
e-datoms (-> (search-temporal-indices db nil) vec (filter-before e db) vec)
e-datoms (mapcat
(fn [d] [d (datom (.-e d) (.-a d) (.-v d) (.-tx d) true)])
(-> (search-temporal-indices db [nil nil nil nil false])
vec (filter-before e db) vec))
retracted-comps (purge-components history e-datoms)]
(recur (reduce transact-purge-datom report e-datoms)
(concat retracted-comps entities)))
Expand Down
9 changes: 6 additions & 3 deletions src/datahike/index.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
(-seq [index])
(-count [index])
(-insert [index datom index-type])
(-upsert [index datom temporal? index-type])
(-upsert [index datom index-type])
(-temporal-upsert [index datom index-type])
(-remove [index datom index-type])
(-slice [index from to index-type])
(-flush [index backend])
Expand All @@ -28,8 +29,10 @@
(dih/-count eavt-tree :eavt))
(-insert [tree datom index-type]
(dih/-insert tree datom index-type))
(-upsert [tree datom temporal? index-type]
(dih/-upsert tree datom temporal? index-type))
(-upsert [tree datom index-type]
(dih/-upsert tree datom index-type))
(-temporal-upsert [tree datom index-type]
(dih/-temporal-upsert tree datom index-type))
(-remove [tree datom index-type]
(dih/-remove tree datom index-type))
(-slice [tree from to index-type]
Expand Down
100 changes: 56 additions & 44 deletions src/datahike/index/hitchhiker_tree.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,81 @@
#?(:clj (:import [clojure.lang AMapEntry]
[datahike.datom Datom])))

(defn- datom->node [^Datom datom index-type]
(case index-type
:aevt [(.-a datom) (.-e datom) (.-v datom) (.-tx datom)]
:avet [(.-a datom) (.-v datom) (.-e datom) (.-tx datom)]
[(.-e datom) (.-a datom) (.-v datom) (.-tx datom)]))

(defn old-key
"Removes old key from map using remove-fn function if new and old keys' first 2 entries match."
"Returns an old version of the 'new' key if it exists in 'map'"
[map new]
(let [[a b _ _] new]
(when (seq map)
(when-let [[[oa ob oc od] _] (first (subseq map >= [a b nil nil]))]
(when (and (= (kc/-compare a oa) 0) (= (kc/-compare b ob) 0))
[oa ob oc od])))))

(defn remove-old
"Removes old key from map using remove-fn function if new and old keys' first 2 entries match."
[map new remove-fn]
(when-let [old (old-key map new)]
(remove-fn old)))

(defrecord UpsertOp [key value]
op/IOperation
(-affects-key [_] key)
(-apply-op-to-coll [_ map]
(-> (or (remove-old map key (partial dissoc map)) map)
(assoc key value)))
(-apply-op-to-tree [_ tree]
(let [children (cond
(tree/data-node? tree) (:children tree)
:else (:children (peek (tree/lookup-path tree key))))]
(-> (or (remove-old children key (partial tree/delete tree)) tree)
(tree/insert key value)))))

(defn *apply-op-to-tree [tree key value temporal?]
(let [children (cond
(tree/data-node? tree) (:children tree)
:else (:children (peek (tree/lookup-path tree key))))
old (old-key children key)]
(-> (if temporal?
(if old
(let [[a b c ot] old
[_ _ _ nt] key
old-retracted [a b c (- ot)]]
(tree/insert tree old-retracted old-retracted))
tree)
(if old
(tree/delete tree old)
tree))
(tree/insert key value))))

(defrecord UpsertOp [key value temporal?]
(defn old-retracted [map key]
(when-let [old (old-key map key)]
(let [[a b c ot] old]
[a b c (- ot)])))

(defrecord temporal-UpsertOp [key value]
op/IOperation
(-affects-key [_] key)
(-apply-op-to-coll [_ map]
(let [old (old-key map key)]
(-> (if temporal?
(if old
(let [[a b c ot] old
[_ _ _ nt] key
old-retracted [a b c (- ot)]]
(assoc map old-retracted old-retracted))
map)
(if old
(dissoc map old)
map))
(let [old-retracted (old-retracted map key)]
(-> (if old-retracted
(assoc map old-retracted old-retracted)
map)
(assoc key value))))
(-apply-op-to-tree [_ tree]
(*apply-op-to-tree tree key value temporal?)))
(let [children (cond
(tree/data-node? tree) (:children tree)
:else (:children (peek (tree/lookup-path tree key))))
old-retracted (old-retracted children key)]
(-> (if old-retracted
(tree/insert tree old-retracted old-retracted)
tree)
(tree/insert key value)))))


(defn new-UpsertOp [key value]
(UpsertOp. key value))

(defn new-UpsertOp [key value temporal?]
(UpsertOp. key value temporal?))
(defn new-temporal-UpsertOp [key value]
(temporal-UpsertOp. key value))

(defn -upsert [tree ^Datom datom index-type]
(async/<?? (hmsg/upsert tree (new-UpsertOp
(datom->node datom index-type)
(datom->node datom index-type)))))

(defn -temporal-upsert [tree ^Datom datom index-type]
(async/<?? (hmsg/upsert tree (new-temporal-UpsertOp
(datom->node datom index-type)
(datom->node datom index-type)))))

(extend-protocol kc/IKeyCompare
clojure.lang.PersistentVector
Expand Down Expand Up @@ -96,12 +120,6 @@
:avet (fn [a v e tx] (dd/datom e a v tx true))
(fn [e a v tx] (dd/datom e a v tx true))))

(defn- datom->node [^Datom datom index-type]
(case index-type
:aevt [(.-a datom) (.-e datom) (.-v datom) (.-tx datom)]
:avet [(.-a datom) (.-v datom) (.-e datom) (.-tx datom)]
[(.-e datom) (.-a datom) (.-v datom) (.-tx datom)]))

(defn- from-datom [^Datom datom index-type]
(let [datom-seq (case index-type
:aevt (list (.-a datom) (.-e datom) (.-v datom) (.-tx datom))
Expand Down Expand Up @@ -170,12 +188,6 @@
(defn -insert [tree ^Datom datom index-type]
(hmsg/insert tree (datom->node datom index-type) nil))

(defn -upsert [tree ^Datom datom temporal? index-type]
(async/<?? (hmsg/upsert tree (new-UpsertOp
(datom->node datom index-type)
(datom->node datom index-type)
temporal?))))

(defn init-tree
"Create tree with datoms"
[datoms index-type]
Expand Down
2 changes: 1 addition & 1 deletion test/datahike/test/upsert_implem_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

(defn upsert-helper
[t k]
(ha/<?? (msg/upsert t (ht/new-UpsertOp k k false))))
(ha/<?? (msg/upsert t (ht/new-UpsertOp k k))))


(deftest upsert
Expand Down

0 comments on commit 501aa72

Please sign in to comment.