Skip to content

Commit

Permalink
add local file import support
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Dec 26, 2024
1 parent 8b77dd3 commit d4e778f
Show file tree
Hide file tree
Showing 23 changed files with 222 additions and 171 deletions.
6 changes: 6 additions & 0 deletions create-ln.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ do
ln -s $f /opt/data/tis/libs/plugins/${f##*/}
done ;

cd /opt/misc/tis-plugins-commercial
sh /opt/misc/tis-plugins-commercial/create-ln.sh

cd /opt/misc/tis-sqlserver-plugin
sh /opt/misc/tis-sqlserver-plugin/create-ln.sh

#for tis-scala-compiler-dependencies
#rm -f /opt/data/tis/libs/tis-scala-compiler-dependencies/*
#cd ./tis-scala-compiler-dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
**/
public
interface RunningContext {

public String getDbName();
public String getTable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@
import com.google.common.collect.Maps;
import com.qlangtech.tis.common.utils.Assert;
import com.qlangtech.tis.manage.common.TisUTF8;
import com.qlangtech.tis.plugin.ds.*;
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.DBConfig;
import com.qlangtech.tis.plugin.ds.DataDumpers;
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.IDataSourceDumper;
import com.qlangtech.tis.plugin.ds.JDBCTypes;
import com.qlangtech.tis.plugin.ds.RunningContext;
import com.qlangtech.tis.plugin.ds.TISTable;
import com.qlangtech.tis.plugin.ds.TableInDB;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
Expand All @@ -31,7 +40,6 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -46,7 +54,6 @@ protected Connection getConnection(String jdbcUrl, String username, String passw
return DriverManager.getConnection(jdbcUrl, StringUtils.trimToNull(username), StringUtils.trimToNull(password));
}


@Override
public DBConfig getDbConfig() {
throw new IllegalStateException();
Expand All @@ -61,15 +68,6 @@ public void visitFirstConnection(IConnProcessor connProcessor) {
public void refresh() {

}
// @Override
// public void refectTableInDB(TableInDB tabs, Connection conn) throws SQLException {
// throw new UnsupportedOperationException();
// }

// @Override
// public String identityValue() {
// return "mockDs";
// }

/**
* 模拟Employee表的导入
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1180,8 +1180,8 @@ public void doUpdateDatax(Context context) throws Exception {

IAppSource.cleanAppSourcePluginStoreCache(null, dataxName);
IAppSource.cleanAppSourcePluginStoreCache(this, dataxName);
SelectedTabExtend.clearTabExtend(null,dataxName);
SelectedTabExtend.clearTabExtend(this,dataxName);
SelectedTabExtend.clearTabExtend(null, dataxName);
SelectedTabExtend.clearTabExtend(this, dataxName);

DataXJobSubmit.getPowerJobSubmit().ifPresent((submit) -> {
submit.saveJob(this, context, old);
Expand Down Expand Up @@ -1348,7 +1348,7 @@ public void doGetTableMapper(Context context) {
}

if (!dataxReader.hasMulitTable()) {
throw new IllegalStateException("reader has not set table at least");
throw new IllegalStateException("reader (" + dataxReader.getClass().getSimpleName() + ") has not set table at least");
}
List<TableAlias> tmapList = Lists.newArrayList();
for (ISelectedTab selectedTab : dataxReader.getSelectedTabs()) {
Expand Down Expand Up @@ -1550,7 +1550,7 @@ public boolean validate(IFieldErrorHandler msgHandler, Context context, String f
addErrorMessage(context, "请至少选择一个主键列");
postMCols.validateFaild = true;
}

writerCols.addAll(postMCols.writerCols);
return !postMCols.validateFaild;
}
}))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -50,6 +51,25 @@ public static CMeta convert(ColumnMetaData c) {
return c.convert();
}

public static List<ColumnMetaData> convert(List<CMeta> cs) {
int[] index = new int[1];
return cs.stream().map((cm) -> {
//int index, String key, DataType type, boolean pk, boolean nullable
return new ColumnMetaData(index[0]++, cm.getName(), cm.getType(), cm.isPk(), cm.isNullable());

}).collect(Collectors.toList());

// ColumnMetaData c = this;
// CMeta cmeta = createCmeta();
// cmeta.setName(c.getName());
// cmeta.setComment(c.getComment());
// cmeta.setPk(c.isPk());
// cmeta.setType(c.getType());
// cmeta.setNullable(c.isNullable());
//
// return c.convert();
}

public static void fillSelectedTabMeta(ISelectedTab tab,
Function<ISelectedTab, Map<String, ColumnMetaData>> tableColsMetaGetter) {
Map<String, ColumnMetaData> colsMeta = tableColsMetaGetter.apply(tab);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Licensed to the Apache Software Foundation (ASF) under one
Expand Down Expand Up @@ -39,12 +40,14 @@ public DefaultTab(String dataXName, List<CMeta> writerCols) {

@Override
public List<String> getPrimaryKeys() {
throw new UnsupportedOperationException();
return writerCols.stream()
.filter((col) -> col.isPk())
.map((col) -> col.getName()).collect(Collectors.toList());
}

@Override
public List<IColMetaGetter> overwriteCols(IMessageHandler pluginCtx,boolean includeContextParams) {
throw new UnsupportedOperationException();
public List<IColMetaGetter> overwriteCols(IMessageHandler pluginCtx, boolean includeContextParams) {
return writerCols.stream().collect(Collectors.toList());
}

public DefaultTab(String tabName) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Optional;
import java.util.Properties;

/**
* @author: 百岁([email protected]
Expand Down Expand Up @@ -83,25 +84,29 @@ public HiveTable(String name) {
this.name = name;
}

public static class StoredAs {
public static abstract class StoredAs {
public final String inputFormat;
public final String outputFormat;

private final Object serdeInfo;
// private final Object serdeInfo;

/**
* @param inputFormat
* @param outputFormat
* @param serdeInfo org.apache.hadoop.hive.metastore.api.SerdeInfo
* // @param serdeInfo org.apache.hadoop.hive.metastore.api.SerdeInfo
*/
public StoredAs(String inputFormat, String outputFormat, Object serdeInfo) {
public StoredAs(String inputFormat, String outputFormat) {
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
this.serdeInfo = serdeInfo;
// this.serdeInfo = serdeInfo;
}

public <SerDeInfo> SerDeInfo getSerdeInfo() {
return (SerDeInfo) serdeInfo;
}
public abstract Properties getSerdeProperties(HiveTable table);

public abstract String getSerializationLib();

// public <SerDeInfo> SerDeInfo getSerdeInfo() {
// return (SerDeInfo) serdeInfo;
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.RunningContext;
import com.qlangtech.tis.plugin.ds.TableInDB;
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public SourceColMetaGetter(IDataxReader dataXReader) {

private Map<String, ColumnMetaData> getColMetaDataMap(IDataxReader dataXReader, TableMap tableMapper) {
try {
return ColumnMetaData.toMap(dataXReader.getTableMetadata(false, EntityName.parse(tableMapper.getFrom())));
return ColumnMetaData.toMap(dataXReader.getTableMetadata(false, tableMapper));
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,11 @@ public List<IColMetaGetter> overwriteCols(IMessageHandler pluginCtx, boolean inc
if (transformerRules.isPresent()) {
ITransformerBuildInfo transformerBuilder = transformerRules.get().createTransformerBuildInfo((IPluginContext) pluginCtx);

List<OutputParameter> overwriteColsWithContextParams
= transformerBuilder.overwriteColsWithContextParams(this.getCols());

List<OutputParameter> outParams = includeContextParams
? transformerBuilder.overwriteColsWithContextParams(this.getCols())
? overwriteColsWithContextParams
: transformerBuilder.tranformerColsWithoutContextParams();
return outParams.stream().map((param) -> param).collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void appendExternalJsonProp(IPropertyType propertyType, JSONObject biz) {
// ElementCreatorFactory.super.appendExternalJsonProp(propertyType, biz);
}


protected List<CMeta> getColsCandidate() {
List<CMeta> colsCandidate = SelectedTab.getSelectedCols();
return colsCandidate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
**/
@Public
public abstract class DataSourceFactory implements Describable<DataSourceFactory>, Serializable, DBIdentity, DataSourceMeta, Wrapper {
// public static final ZoneId DEFAULT_SERVER_TIME_ZONE = MQListenerFactory.DEFAULT_SERVER_TIME_ZONE; // ZoneId.systemDefault();// ZoneId.of("Asia/Shanghai");
// public static final ZoneId DEFAULT_SERVER_TIME_ZONE = MQListenerFactory.DEFAULT_SERVER_TIME_ZONE; // ZoneId.systemDefault();// ZoneId.of("Asia/Shanghai");
public static final String DS_TYPE_MYSQL = "MySQL";
public static final String DS_TYPE_MYSQL_V8 = DS_TYPE_MYSQL + "-V8";

Expand Down Expand Up @@ -258,7 +258,7 @@ protected List<ColumnMetaData> parseTableColMeta(boolean inSink, String jdbcUrl,
pkCols.add(columnName);
}

return wrapColsMeta(inSink, table, columns1, pkCols);
return wrapColsMeta(inSink, table, columns1, pkCols, conn);
} finally {
closeResultSet(columns1);
closeResultSet(primaryKeys);
Expand All @@ -267,8 +267,9 @@ protected List<ColumnMetaData> parseTableColMeta(boolean inSink, String jdbcUrl,
// return columns;
}

public List<ColumnMetaData> wrapColsMeta(boolean inSink, EntityName table, ResultSet columns1) throws SQLException, TableNotFoundException {
return wrapColsMeta(inSink, table, columns1, Collections.emptySet());
public List<ColumnMetaData> wrapColsMeta(
boolean inSink, EntityName table, ResultSet columns1, JDBCConnection conn) throws SQLException, TableNotFoundException {
return wrapColsMeta(inSink, table, columns1, Collections.emptySet(), conn);
}

public static final String KEY_COLUMN_NAME = "COLUMN_NAME";
Expand All @@ -281,9 +282,13 @@ public List<ColumnMetaData> wrapColsMeta(boolean inSink, EntityName table, Resul
public static final String KEY_DATA_TYPE = "DATA_TYPE";
public static final String KEY_COLUMN_SIZE = "COLUMN_SIZE";

public List<ColumnMetaData> wrapColsMeta(
boolean inSink, EntityName table, ResultSet columns1, Set<String> pkCols) throws SQLException, TableNotFoundException {
return this.wrapColsMeta(inSink, table, columns1, new CreateColumnMeta(pkCols, columns1));
public final List<ColumnMetaData> wrapColsMeta(
boolean inSink, EntityName table, ResultSet columns1, Set<String> pkCols, JDBCConnection conn) throws SQLException, TableNotFoundException {
return this.wrapColsMeta(inSink, table, columns1, createColumnMetaBuilder(table, columns1, pkCols, conn));
}

protected CreateColumnMeta createColumnMetaBuilder(EntityName table, ResultSet columns1, Set<String> pkCols, JDBCConnection conn) {
return new CreateColumnMeta(pkCols, columns1);
}

public List<ColumnMetaData> wrapColsMeta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import com.google.common.collect.Maps;
import com.qlangtech.tis.datax.IDataxProcessor.TableMap;
import com.qlangtech.tis.extension.Describable;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;

Expand Down Expand Up @@ -52,6 +53,9 @@ default TableInDB getTablesInDB() {
throw new UnsupportedOperationException();
}

default List<ColumnMetaData> getTableMetadata(boolean inSink, TableMap tableMapper) throws TableNotFoundException {
return getTableMetadata(inSink, EntityName.parse(tableMapper.getFrom()));
}

/**
* Get table column metaData list
Expand Down
Loading

0 comments on commit d4e778f

Please sign in to comment.