Skip to content

Commit

Permalink
add the support for postgresql's version 9.6+
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaowing committed Feb 16, 2019
1 parent 303958d commit 0a18870
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 91 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,4 @@ the following features are on the way
- [ ] a local persistant buffer to solve the times limit issue of the douban API
- [ ] support the public api for retrieving data of chart "us_box"
- [ ] server-side encoding convert to support the database of which not being UTF8-encoded
- [ ] PostgreSQL 9.6+ support
- [x] PostgreSQL 9.6+ support
180 changes: 102 additions & 78 deletions douban_fdwgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,46 @@ package main
static void ErrorReport(int sqlstate, char *error_msg)
{
ereport(ERROR, (errcode(sqlstate), errmsg(error_msg)));
ereport(ERROR, (errcode(sqlstate), errmsg(error_msg)));
}
static void InfoReport(char *error_msg)
{
ereport(INFO, (errmsg(error_msg)));
ereport(INFO, (errmsg(error_msg)));
}
static Path *createDoubanForeignPath(PlannerInfo *root, RelOptInfo *baserel, Cost startCost, Cost totalCost)
{
return (Path *)create_foreignscan_path(root, baserel,
#if PG_VERSION_NUM >= 90600
NULL,
#endif
baserel->rows, startCost, totalCost, NIL, NULL, NULL, NIL);
}
static void explainPropertyInteger(char *qlabel, long value, ExplainState *es)
{
#if PG_VERSION_NUM >= 110000
ExplainPropertyInteger(qlabel, NULL, value, es);
#else
ExplainPropertyLong(qlabel, value, es);
#endif
}
static void pullVarattnosTargetList(RelOptInfo *baserel, Bitmapset **varattnos)
{
#if PG_VERSION_NUM >= 90600
pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid, varattnos);
#else
pull_varattnos((Node *) baserel->reltargetlist, baserel->relid, varattnos);
#endif
}
*/
import "C"
import (
"strconv"
"fmt"
"reflect"
"strconv"
"strings"
"unsafe"
)
Expand Down Expand Up @@ -82,8 +109,8 @@ const (
ERRCODE_FDW_UNABLE_TO_ESTABLISH_CONNECTION = 503318936
)

/*
* the following const is used for retrieving data from fdw_private
/*
* the following const is used for retrieving data from fdw_private
* usage: pass the const as the 2nd argument of list_nth() while retrieving data
*/
const (
Expand All @@ -103,17 +130,17 @@ type DoubanOption struct {

// The state passed for the scan.
type DoubanScanState struct {
scanningRel *C.RelationData
attrsRetrieved []*TargetColumnMeta
resultSet []MovieItem
currentRow int
scanningRel *C.RelationData
attrsRetrieved []*TargetColumnMeta
resultSet []MovieItem
currentRow int
}

type TargetColumnMeta struct {
attrNum int /* same as attrNum in PG, starts from 1 */
attrName string
attrType C.Oid
attrTypmod C.int32
attrNum int /* same as attrNum in PG, starts from 1 */
attrName string
attrType C.Oid
attrTypmod C.int32
}

func (self *TargetColumnMeta) convert2Datum(source *MovieItem) C.Datum {
Expand All @@ -122,16 +149,16 @@ func (self *TargetColumnMeta) convert2Datum(source *MovieItem) C.Datum {
var tuple *C.HeapTupleData
var valueDatum, retvalDatum C.Datum

tuple = C.SearchSysCache(C.TYPEOID, C.Datum(self.attrType),
C.Datum(0), C.Datum(0), C.Datum(0))
tuple = C.SearchSysCache(C.TYPEOID, C.Datum(self.attrType),
C.Datum(0), C.Datum(0), C.Datum(0))
if tuple == (*C.HeapTupleData)(nil) {
ereport(ERRCODE_FDW_ERROR, "cache lookup failed for type%d", int(self.attrType))
}

pgtype := (*C.FormData_pg_type)(unsafe.Pointer(uintptr((unsafe.Pointer)(tuple.t_data)) +
uintptr(((*C.HeapTupleHeaderData)((unsafe.Pointer)(tuple.t_data))).t_hoff)))
pgtype := (*C.FormData_pg_type)(unsafe.Pointer(uintptr((unsafe.Pointer)(tuple.t_data)) +
uintptr(((*C.HeapTupleHeaderData)((unsafe.Pointer)(tuple.t_data))).t_hoff)))
typeinput = pgtype.typinput
typemod = C.int(pgtype.typtypmod)
typemod = C.int(pgtype.typtypmod)
C.ReleaseSysCache(tuple)

/* convert the field valut into Datum(de facto: CString) so that we can use the typinput function */
Expand All @@ -141,22 +168,22 @@ func (self *TargetColumnMeta) convert2Datum(source *MovieItem) C.Datum {

var itemValStr string
switch self.attrName {
case "rating":
itemValStr = strconv.FormatFloat(float64(source.GetAverageScore()), 'f', 2, 32)
case "genres":
itemValStr = source.GetGenres()
case "casts":
itemValStr = source.GetCasts()
case "directors":
itemValStr = source.GetDirectors()
case "collectcount":
itemValStr = strconv.Itoa(source.CollectCount)
default:
for j := 0; j < movieItemType.NumField(); j++ {
if self.attrName == strings.ToLower(movieItemType.Field(j).Name) {
itemValStr = movieItemValue.FieldByName(movieItemType.Field(j).Name).String()
}
case "rating":
itemValStr = strconv.FormatFloat(float64(source.GetAverageScore()), 'f', 2, 32)
case "genres":
itemValStr = source.GetGenres()
case "casts":
itemValStr = source.GetCasts()
case "directors":
itemValStr = source.GetDirectors()
case "collectcount":
itemValStr = strconv.Itoa(source.CollectCount)
default:
for j := 0; j < movieItemType.NumField(); j++ {
if self.attrName == strings.ToLower(movieItemType.Field(j).Name) {
itemValStr = movieItemValue.FieldByName(movieItemType.Field(j).Name).String()
}
}
}
valueDatum = C.Datum(uintptr(unsafe.Pointer(C.CString(itemValStr))))

Expand All @@ -180,17 +207,16 @@ func doubanGetForeignRelSize(root *C.PlannerInfo,
var referredAttrs *C.Bitmapset

// Collect all the attributes needed for joins or final output.
targetlist := (*C.Node)(unsafe.Pointer(baserel.reltargetlist)) // TODO: member field of 'RelOptInfo' changed in 9.6
C.pull_varattnos(targetlist, baserel.relid, (**C.Bitmapset)(unsafe.Pointer(&referredAttrs)))
C.pullVarattnosTargetList(baserel, (**C.Bitmapset)(unsafe.Pointer(&referredAttrs)))

// Add all the attributes used by restriction clauses.
restrictNum := int(C.list_length(baserel.baserestrictinfo))
for i := 0; i < restrictNum; i++ {
rinfo := (*C.RestrictInfo)(unsafe.Pointer(uintptr(C.list_nth(baserel.baserestrictinfo, C.int(i)))))
C.pull_varattnos((*C.Node)(unsafe.Pointer(rinfo.clause)), baserel.relid,
(**C.Bitmapset)(unsafe.Pointer(&referredAttrs)))
(**C.Bitmapset)(unsafe.Pointer(&referredAttrs)))
}

// check if the name of the referred attrs are valid
attributesRetrieved := referredFieldsValidator(foreigntableid, referredAttrs)
C.bms_free(referredAttrs)
Expand Down Expand Up @@ -228,13 +254,13 @@ func referredFieldsValidator(foreigntableId C.Oid, referredFields *C.Bitmapset)
retval := make([]*TargetColumnMeta, 0, nattrs)

tempMovieItem := MovieItem{}
movieItemType := reflect.TypeOf(tempMovieItem) /* a struct-type is necessary for calling the NumField() method */
movieItemType := reflect.TypeOf(tempMovieItem) /* a struct-type is necessary for calling the NumField() method */

attrFound := false
attrslice := (*[1 << 30]C.Form_pg_attribute)(unsafe.Pointer(tupdesc.attrs))[:nattrs:nattrs]

for i := 1; i <= nattrs; i++ {
attr := attrslice[i - 1]
attr := attrslice[i-1]

attrFound = false
// Ignore dropped attributes.
Expand Down Expand Up @@ -284,9 +310,7 @@ func doubanGetForeignPaths(root *C.PlannerInfo,
startupCost := C.Cost(40.0)
totalCost := C.Cost(40.0 + baserel.rows)

path := (*C.Path)(unsafe.Pointer(C.create_foreignscan_path(root, baserel,
baserel.rows, startupCost, totalCost, (*C.List)(nil),
nil, nil, (*C.List)(nil))))
path := (*C.Path)(unsafe.Pointer(C.createDoubanForeignPath(root, baserel, startupCost, totalCost)))
C.add_path(baserel, path)
}

Expand All @@ -297,12 +321,12 @@ func doubanGetForeignPlan(root *C.PlannerInfo,
scanClauses *C.List, outerPlan *C.Plan) *C.ForeignScan {

_, ok := Restore(unsafe.Pointer(baserel.fdw_private)).([]*TargetColumnMeta)
if (!ok) {
if !ok {
ereport(ERRCODE_FDW_ERROR, "type assersion of \"%p\" to \"[]*TargetColumnMeta\" failed ", unsafe.Pointer(baserel.fdw_private))
}
//build the fdw_private list as the 5th arguement of make_foreignscan() if necessary
scan_private :=
(*C.List)(unsafe.Pointer(C.lcons(unsafe.Pointer(baserel.fdw_private), (*C.List)(nil))))
scan_private :=
(*C.List)(unsafe.Pointer(C.lcons(unsafe.Pointer(baserel.fdw_private), (*C.List)(nil))))

newScanClauses :=
(*C.List)(unsafe.Pointer(C.extract_actual_clauses(scanClauses, C.bool(0))))
Expand All @@ -321,7 +345,7 @@ func doubanBeginForeignScan(node *C.ForeignScanState,

// Do nothing in EXPLAIN (no ANALYZE) case.
// macro "EXEC_FLAG_EXPLAIN_ONLY" means 0x0001
if int(eflags) & 0x0001 != 0 {
if int(eflags)&0x0001 != 0 {
return
}

Expand All @@ -337,7 +361,7 @@ func doubanBeginForeignScan(node *C.ForeignScanState,
return // ereport will cause the statement jumped out of the execution.
}

if (len(items) < MovieRankingTop250Num) {
if len(items) < MovieRankingTop250Num {
//TODO: change info into warning
info("%d items expected from Douban.com, but only %d returned actually", MovieRankingTop250Num, len(items))
}
Expand All @@ -349,11 +373,11 @@ func doubanBeginForeignScan(node *C.ForeignScanState,
fdwPrivate := unsafe.Pointer(uintptr(C.list_nth(privateList, C.int(FDW_PRIVATE_INDEX_ATTRUSED))))

metas, ok := Restore(fdwPrivate).([]*TargetColumnMeta)
if (!ok) {
if !ok {
/*
the long-jump in ereport will directly jump into the Postgres's C-stack,
I'm not sure if the machanism of "defer" in Go would take effect
*/
the long-jump in ereport will directly jump into the Postgres's C-stack,
I'm not sure if the machanism of "defer" in Go would take effect
*/
Unref(fdwPrivate)
ereport(ERRCODE_FDW_ERROR, "internal error: type assersion of \"%p\" to \"[]*TargetColumnMeta\" failed ", fdwPrivate)
}
Expand All @@ -374,12 +398,12 @@ func doubanIterateForeignScan(node *C.ForeignScanState) *C.TupleTableSlot {
slot := (*C.TupleTableSlot)(unsafe.Pointer(sstate.ss_ScanTupleSlot))
tupDesc := (C.TupleDesc)(unsafe.Pointer(slot.tts_tupleDescriptor))
dbstate, ok := Restore(unsafe.Pointer(node.fdw_state)).(*DoubanScanState)
if (!ok) {
if !ok {
/*
the long-jump in ereport will directly jump into the Postgres's C-stack,
I'm not sure if the machanism of "defer" in Go would take effect
*/
ereport(ERRCODE_FDW_ERROR, "internal error: type assersion of \"%p\" to \"*DoubanScanState\" failed ",
the long-jump in ereport will directly jump into the Postgres's C-stack,
I'm not sure if the machanism of "defer" in Go would take effect
*/
ereport(ERRCODE_FDW_ERROR, "internal error: type assersion of \"%p\" to \"*DoubanScanState\" failed ",
unsafe.Pointer(node.fdw_state))
}

Expand All @@ -388,31 +412,31 @@ func doubanIterateForeignScan(node *C.ForeignScanState) *C.TupleTableSlot {
}

if dbstate.currentRow < 0 || dbstate.currentRow > iterateMax {
ereport(ERRCODE_FDW_ERROR, "internal error: \"DoubanScanState.currentRow\" %d beyond the upper value %d",
ereport(ERRCODE_FDW_ERROR, "internal error: \"DoubanScanState.currentRow\" %d beyond the upper value %d",
dbstate.currentRow, iterateMax)
}

natts := int(tupDesc.natts)

C.memset(unsafe.Pointer(slot.tts_values), 0x00, C.size_t(C.sizeof_Datum * tupDesc.natts))
C.memset(unsafe.Pointer(slot.tts_isnull), C.int(1), C.size_t(C.sizeof_bool * tupDesc.natts))
C.memset(unsafe.Pointer(slot.tts_values), 0x00, C.size_t(C.sizeof_Datum*tupDesc.natts))
C.memset(unsafe.Pointer(slot.tts_isnull), C.int(1), C.size_t(C.sizeof_bool*tupDesc.natts))
C.ExecClearTuple(slot)
/*
* all the Top250 items were retrieved, it's time to stop the iteration
* TODO: if the limit clause pushdown were implemented, the following if-condition
* should be modified
*/
* all the Top250 items were retrieved, it's time to stop the iteration
* TODO: if the limit clause pushdown were implemented, the following if-condition
* should be modified
*/
if dbstate.currentRow == iterateMax {
return slot
}

datumsSlice := (*[1 << 30]C.Datum)(unsafe.Pointer(slot.tts_values))[:natts:natts]
isnullsSlice := (*[1 << 30]C.bool)(unsafe.Pointer(slot.tts_isnull))[:natts:natts]
for _, val := range dbstate.attrsRetrieved {
isnullsSlice[val.attrNum - 1] = C.bool(0)
datumsSlice[val.attrNum - 1] = val.convert2Datum(&(dbstate.resultSet[dbstate.currentRow]))
isnullsSlice[val.attrNum-1] = C.bool(0)
datumsSlice[val.attrNum-1] = val.convert2Datum(&(dbstate.resultSet[dbstate.currentRow]))
}
C.ExecStoreVirtualTuple(slot);
C.ExecStoreVirtualTuple(slot)
dbstate.currentRow += 1

return slot
Expand All @@ -427,12 +451,12 @@ func doubanEndForeignScan(node *C.ForeignScanState) {
//export doubanReScanForeignScan
func doubanReScanForeignScan(node *C.ForeignScanState) {
dbstate, ok := Restore(unsafe.Pointer(node.fdw_state)).(*DoubanScanState)
if (!ok) {
if !ok {
/*
the long-jump in ereport will directly jump into the Postgres's C-stack,
I'm not sure if the machanism of "defer" in Go would take effect
*/
ereport(ERRCODE_FDW_ERROR, "internal error: type assersion of \"%p\" to \"*DoubanScanState\" failed ",
the long-jump in ereport will directly jump into the Postgres's C-stack,
I'm not sure if the machanism of "defer" in Go would take effect
*/
ereport(ERRCODE_FDW_ERROR, "internal error: type assersion of \"%p\" to \"*DoubanScanState\" failed ",
unsafe.Pointer(node.fdw_state))
}

Expand All @@ -442,7 +466,7 @@ func doubanReScanForeignScan(node *C.ForeignScanState) {
} else {
sstate := (*C.ScanState)(unsafe.Pointer(&(node.ss)))
rel := (*C.RelationData)(unsafe.Pointer(sstate.ss_currentRelation))

rank := getRankNameFromForeginTable(rel)

items, err := RetrieveRankingData(rank, 50) //TODO: constant variable 50 should be changed
Expand Down Expand Up @@ -472,15 +496,15 @@ func doubanExplainForeignScan(node *C.ForeignScanState,
if int(es.costs) > 0 {
detailTitle := C.CString("Movie items")

C.ExplainPropertyLong(detailTitle, C.long(MovieRankingTop250Num), es)
C.explainPropertyInteger(detailTitle, C.long(MovieRankingTop250Num), es)
}
}

//export doubanAnalyzeForeignTable
func doubanAnalyzeForeignTable(relation *C.RelationData,
aquireSampleRowsFunc *C.AcquireSampleRowsFunc, totalpages *C.uint) C.bool {
*totalpages = 1
return C.bool(0) /* the foreign data cannot be sampled since it's a web api */
return C.bool(0) /* the foreign data cannot be sampled since it's a web api */
}

//export checkOptionName
Expand Down Expand Up @@ -516,7 +540,7 @@ func getRankNameFromForeginTable(rel *C.RelationData) string {
}

optionCount := int(C.list_length(table.options))
for i := 0; i < optionCount; i++ {
for i := 0; i < optionCount; i++ {
def := (*C.DefElem)(unsafe.Pointer(uintptr(C.list_nth(table.options, C.int(i)))))
if strings.ToLower(C.GoString(def.defname)) == "rank_name" {
// we don't need to worry about the correctness of def value
Expand All @@ -526,12 +550,12 @@ func getRankNameFromForeginTable(rel *C.RelationData) string {
}

/* "rank_name" option not found */
relationName := (* C.NameData)(unsafe.Pointer(&(((*C.FormData_pg_class)(unsafe.Pointer(rel.rd_rel))).relname)))
relationName := (*C.NameData)(unsafe.Pointer(&(((*C.FormData_pg_class)(unsafe.Pointer(rel.rd_rel))).relname)))
tabname := C.GoString(&(relationName.data[0]))
ereport(ERRCODE_FDW_INVALID_OPTION_NAME,
ereport(ERRCODE_FDW_INVALID_OPTION_NAME,
"the \"rank_name\" option not specified while defining the foreign table \"%s\"", tabname)

return "" /* avoid the compile error */
return "" /* avoid the compile error */
}

func main() {}
Loading

0 comments on commit 0a18870

Please sign in to comment.