Skip to content

Commit

Permalink
KYLO-3217 wrangler joins
Browse files Browse the repository at this point in the history
  • Loading branch information
scottreisdorf committed Dec 18, 2018
1 parent 2a606d9 commit aef7ffd
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import {Component, Input, OnInit, TemplateRef, ViewChild} from "@angular/core";
import {FormGroup} from "@angular/forms";
import {DatasetPreviewStepperService} from "./dataset-preview-stepper.service";
/*
Example impl of this class
@Component({selector:"dataset-preview-step",
template:`<ng-template let-data #stepTemplate>
This is my template {{data}}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import {FormGroup} from "@angular/forms";
import {TemplateRef, ViewChild} from "@angular/core";

/**
* Any additional step in the preview stepper needs to implement this interface
*/
export interface PreviewStepperStep<T>{
data:T;

stepControl:FormGroup;

templateRef: TemplateRef<any>;

init():void;

/**
* return any saved data you want passed to the subscriber
*/
onSave():any;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ export class SparkConstants {
* Identifier for the native Hive data source.
*/
static USER_FILE_DATASOURCE = "FILE";

static RESERVED_COLUMN_NAMES:string[] = ["processing_dttm"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,27 +276,8 @@ export class SparkExpression {
* @throws {ParseException} if the expression cannot be converted to a DataFrame
*/
private static toDataFrame(expression: SparkExpression): string {
let dfVar = null;
let source = expression.source;
if (SparkExpressionType.DATA_FRAME.equals(expression.type)) {
dfVar = SparkConstants.DATA_FRAME_VARIABLE;
}
else if (SparkExpressionType.LITERAL.equals(expression.type)) {
let nLiteral: number = parseInt(expression.source as string);
if (!isNaN(nLiteral)) {
if(nLiteral == 0){
dfVar = SparkConstants.DATA_FRAME_VARIABLE;
source = "";
}else if(nLiteral == -1){
dfVar = "parent";
source = "";
}else {
dfVar = SparkConstants.DATA_FRAME_VARIABLE;
}
}
}
if(dfVar != null){
return dfVar+source;
return SparkConstants.DATA_FRAME_VARIABLE + expression.source;
}
throw new ParseException("Expression cannot be converted to a DataFrame: " + expression.type, expression.start);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,16 @@ export class SparkQueryEngine extends QueryEngine<string> {
sparkScript += "var " + SparkConstants.DATA_FRAME_VARIABLE + " = parent\n";
}
let dsProvider = DATASET_PROVIDER;
let joinDataSetIds :string[] = [];
for (let i = start; i < end; ++i) {
if (!this.states_[i].inactive) {
let state = this.states_[i];
if(state.joinDataSet != undefined && state.joinDataSet != null) {
if(joinDataSetIds.indexOf(state.joinDataSet.datasetId) <0){
joinDataSetIds.push(state.joinDataSet.datasetId);
sparkScript +=state.joinDataSet.joinDataFrameVarScript+"\n";
}

sparkScript += state.joinDataSet.joinScript;
}
sparkScript += SparkConstants.DATA_FRAME_VARIABLE + " = " + SparkConstants.DATA_FRAME_VARIABLE + this.states_[i].script + "\n";
Expand Down Expand Up @@ -462,7 +468,7 @@ export class SparkQueryEngine extends QueryEngine<string> {
return (column.hiveColumnLabel.match(/[.`]/) !== null); // Escaping backticks not supported until Spark 2.0
});
let reserved = _.find(response.results.columns, function (column: any) {
return (column.hiveColumnLabel === "processing_dttm");
return SparkConstants.RESERVED_COLUMN_NAMES.indexOf(column.hiveColumnLabel) >=0;
});

if (typeof invalid != "undefined") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@ export class InlineJoinScriptBuilder {

}

private uniqueName = (name:string,cnt?:number):string => {
private uniqueName = (name:string, tableName:string,cnt?:number):string => {
if(cnt == undefined){
cnt =0;
}
if(this.names[name] != undefined){
cnt++;
name+=cnt;
return this.uniqueName(name,cnt);
if(cnt ==0){
name = tableName+"_"+name;
cnt++;
}
else {
cnt++;
name += cnt;
}
return this.uniqueName(name,tableName,cnt);
}
this.names[name]= name;
return name;
Expand All @@ -42,16 +48,22 @@ export class InlineJoinScriptBuilder {
})
}


private getJoinSelectColumns(joinDf:string):ResTarget[] {
return this.joinData.joinDataSet.schema.map(field => {
return this.joinData.joinFields.map(field => {
let fields: string[] = [];
fields.push(joinDf)
fields.push(field.name);
let name = this.joinData.ds.getTableName() + "_" +field.name;
fields.push(field);
let name = field;
name = name.replace(/[^a-zA-Z0-9_\s\)\(-]*/g,'');
name = name.replace(" ","_")
name = name.replace(".","_")
name = this.uniqueName(name);
let cnt = 0;
if(SparkConstants.RESERVED_COLUMN_NAMES.indexOf(name) >=0){
name = this.joinData.ds.getTableName()+"_"+name;
cnt = 1;
}
name = this.uniqueName(name, this.joinData.ds.getTableName(), cnt);
let t: ResTarget = {description: "", val: {fields: fields}, name:name};
return t;
});
Expand Down Expand Up @@ -80,16 +92,16 @@ export class InlineJoinScriptBuilder {

//get the new name in the select if we renamed it
let joinScript = "";
if(isNewJoinDf) {
joinScript = `val ${joinDf} = ${dsProvider}.read("${dsId}").alias("${joinDf}")
`
}
let joinDataFrameVarScript = "";
// if(isNewJoinDf) {
joinDataFrameVarScript = `val ${joinDf} = ${dsProvider}.read("${dsId}").alias("${joinDf}") `
//}
joinScript += `
${df} = ${df}.join(${joinDf},${df}.col("${dfField}").equalTo(${joinDf}.col("${joinField}")),"${joinType}")${joinSelectString}
`

let joinDataset: JoinDataset = {datasetId:dsId,dataframeId:1,joinScript:joinScript, joinField:joinField, dfField:dfField};
let joinDataset: JoinDataset = {datasetId:dsId,dataframeId:joinDf,joinScript:joinScript, joinDataFrameVarScript:joinDataFrameVarScript,joinField:joinField, dfField:dfField};
return joinDataset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export class JoinData {
joinField:string;
joinDataSet:PreviewDataSet;
joinType:string;
joinFields:string[];
ds:SparkDataSet

constructor() {
Expand Down Expand Up @@ -47,7 +48,7 @@ export class JoinData {
<div fxLayout="row">
<mat-form-field>
<mat-select placeholder="Current schema" formControlName="currentSchemaField" required>
<mat-select placeholder="Current field" formControlName="currentSchemaField" required>
<mat-option *ngFor="let field of data.cols" [value]="field.field">
{{field.displayName}} - {{field.dataType}}
</mat-option>
Expand All @@ -58,14 +59,25 @@ export class JoinData {
<div fxFlex="10" class="pad-left-sm pad-right-sm"> </div>
<mat-form-field *ngIf="dataSet != undefined">
<mat-select placeholder="Joining schema" formControlName="joinSchemaField" required>
<mat-select placeholder="Joining field" formControlName="joinSchemaField" required>
<mat-option *ngFor="let field of dataSet.schema" [value]="field.name">
{{field.label}} - {{field.dataType}}
</mat-option>
</mat-select>
<mat-error *ngIf="joinSchemaFieldControl.hasError('required')">Required</mat-error>
</mat-form-field>
</div>
<div>
<mat-form-field *ngIf="dataSet != undefined">
<mat-select placeholder="Joining schema" formControlName="joinSchemaFields" required multiple >
<mat-option *ngFor="let field of dataSet.schema" [value]="field.name">
{{field.label}} - {{field.dataType}}
</mat-option>
</mat-select>
<mat-error *ngIf="joinSchemaFieldControl.hasError('required')">Required</mat-error>
<mat-hint>Selected {{joinSchemaFieldsControl.value.length}} fields</mat-hint>
</mat-form-field>
</div>
</form>
</div>
</ng-template>`,
Expand All @@ -84,6 +96,8 @@ export class JoinPreviewStepperStep extends AbstractDatasetPreviewStepComponent<

joinTypeControl: FormControl;

joinSchemaFieldsControl: FormControl;

dataSet:PreviewDataSet;

constructor(protected _datasetPreviewStepperService:DatasetPreviewStepperService) {
Expand All @@ -94,19 +108,22 @@ export class JoinPreviewStepperStep extends AbstractDatasetPreviewStepComponent<
this.currentSchemaFieldControl = new FormControl('', [Validators.required]);
this.joinSchemaFieldControl = new FormControl('', [Validators.required]);
this.joinTypeControl = new FormControl('', [Validators.required]);
this.joinSchemaFieldsControl = new FormControl([],[Validators.required]);

this.stepChangedSubscription = this._datasetPreviewStepperService.subscribeToStepChanges(this.onStepChanged.bind(this))

this.stepControl.addControl("joinType", this.joinTypeControl);
this.stepControl.addControl("currentSchemaField", this.currentSchemaFieldControl);
this.stepControl.addControl("joinSchemaField", this.joinSchemaFieldControl);
this.stepControl.addControl("joinSchemaFields",this.joinSchemaFieldsControl)
}


resetForm(){
this.currentSchemaFieldControl.setValue("");
this.joinSchemaFieldControl.setValue("");
this.joinTypeControl.setValue("")
this.joinSchemaFieldsControl.setValue([]);
}

init() {
Expand Down Expand Up @@ -140,6 +157,13 @@ export class JoinPreviewStepperStep extends AbstractDatasetPreviewStepComponent<
}
}

private setJoinSchemaFields(){
let joinNames: string[] = []; _.forEach(this.dataSet.schema, function (field: TableColumn) {
joinNames.push(field.name);
});
this.joinSchemaFieldsControl.setValue(joinNames)
}



private onStepChanged(idx:number){
Expand All @@ -149,6 +173,7 @@ export class JoinPreviewStepperStep extends AbstractDatasetPreviewStepComponent<
if(this.joinTypeControl.value == "" || this.currentSchemaFieldControl.value == "" || this.joinSchemaFieldControl.value == "") {
this.autoMatch();
}
this.setJoinSchemaFields();
}
}

Expand All @@ -160,6 +185,7 @@ export class JoinPreviewStepperStep extends AbstractDatasetPreviewStepComponent<
joinData.joinField = this.joinSchemaFieldControl.value;
joinData.joinType = this.joinTypeControl.value;
joinData.joinDataSet = this.dataSet;
joinData.joinFields = this.joinSchemaFieldsControl.value;
return joinData;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ import {InlineJoinScriptBuilder} from "./dataset-join-dialog/inline-join-script-

declare const CodeMirror: any;

// import {moduleName} from "../module-name";

export class WranglerColumn {

dataType: string = null;
Expand Down Expand Up @@ -493,6 +491,9 @@ export class TransformDataComponent implements AfterViewInit, ColumnController,
}
}

/**
* Join the dataframe to another dataset
*/
inlineJoin(){
let data = new DatasetPreviewStepperDialogData(false,"Add");
data.additionalSteps = [];
Expand All @@ -517,6 +518,7 @@ export class TransformDataComponent implements AfterViewInit, ColumnController,
let joinScriptBuilder = new InlineJoinScriptBuilder(this.visualQueryService,this.model,this.engine.getColumns(),joinData);
let joinDataSet:JoinDataset = joinScriptBuilder.build();

// the formula is empty since its crafted from the "joinData" that is passed to the context
let formula = "";
//if successful register and add it to the list
if(this.model.datasets == undefined){
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
export interface JoinDataset {

datasetId:string;
dataframeId:number;
dataframeId:string;
joinDataFrameVarScript:string;
joinScript:string;
joinField:string;
dfField:string
Expand Down

0 comments on commit aef7ffd

Please sign in to comment.