Skip to content

Commit

Permalink
fix(Postgres Node): Re-use connection pool across executions (n8n-io#…
Browse files Browse the repository at this point in the history
…12346)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <[email protected]>
  • Loading branch information
despairblue and netroy authored Dec 27, 2024
1 parent 7b2630d commit 2ca37f5
Show file tree
Hide file tree
Showing 10 changed files with 501 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ export async function searchSchema(this: ILoadOptionsFunctions): Promise<INodeLi
name: s.schema_name as string,
value: s.schema_name as string,
}));
await db.$pool.end();
return { results };
}

Expand All @@ -122,6 +121,5 @@ export async function searchTables(this: ILoadOptionsFunctions): Promise<INodeLi
name: s.table_name as string,
value: s.table_name as string,
}));
await db.$pool.end();
return { results };
}
1 change: 0 additions & 1 deletion packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ export class PostgresTrigger implements INodeType {
}
} finally {
connection.client.removeListener('notification', onNotification);
if (!db.$pool.ending) await db.$pool.end();
}
};

Expand Down
5 changes: 0 additions & 5 deletions packages/nodes-base/nodes/Postgres/v1/PostgresV1.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ export class PostgresV1 implements INodeType {

const db = pgp(config);
await db.connect();
await db.$pool.end();
} catch (error) {
return {
status: 'Error',
Expand Down Expand Up @@ -409,16 +408,12 @@ export class PostgresV1 implements INodeType {

returnItems = wrapData(updateItems);
} else {
await db.$pool.end();
throw new NodeOperationError(
this.getNode(),
`The operation "${operation}" is not supported!`,
);
}

// shuts down the connection pool associated with the db object to allow the process to finish
await db.$pool.end();

return [returnItems];
}
}
34 changes: 15 additions & 19 deletions packages/nodes-base/nodes/Postgres/v2/actions/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,21 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
operation,
} as PostgresType;

try {
switch (postgresNodeData.resource) {
case 'database':
returnData = await database[postgresNodeData.operation].execute.call(
this,
runQueries,
items,
options,
db,
);
break;
default:
throw new NodeOperationError(
this.getNode(),
`The operation "${operation}" is not supported!`,
);
}
} finally {
if (!db.$pool.ending) await db.$pool.end();
switch (postgresNodeData.resource) {
case 'database':
returnData = await database[postgresNodeData.operation].execute.call(
this,
runQueries,
items,
options,
db,
);
break;
default:
throw new NodeOperationError(
this.getNode(),
`The operation "${operation}" is not supported!`,
);
}

if (operation === 'select' && items.length > 1 && !node.executeOnce) {
Expand Down
46 changes: 19 additions & 27 deletions packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi

const { db } = await configurePostgres.call(this, credentials, options);

try {
const response = await db.any('SELECT schema_name FROM information_schema.schemata');

return {
results: response.map((schema) => ({
name: schema.schema_name as string,
value: schema.schema_name as string,
})),
};
} finally {
if (!db.$pool.ending) await db.$pool.end();
}
const response = await db.any('SELECT schema_name FROM information_schema.schemata');

return {
results: response.map((schema) => ({
name: schema.schema_name as string,
value: schema.schema_name as string,
})),
};
}
export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials<PostgresNodeCredentials>('postgres');
Expand All @@ -32,19 +28,15 @@ export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeLis
extractValue: true,
}) as string;

try {
const response = await db.any(
'SELECT table_name FROM information_schema.tables WHERE table_schema=$1',
[schema],
);

return {
results: response.map((table) => ({
name: table.table_name as string,
value: table.table_name as string,
})),
};
} finally {
if (!db.$pool.ending) await db.$pool.end();
}
const response = await db.any(
'SELECT table_name FROM information_schema.tables WHERE table_schema=$1',
[schema],
);

return {
results: response.map((table) => ({
name: table.table_name as string,
value: table.table_name as string,
})),
};
}
18 changes: 7 additions & 11 deletions packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
extractValue: true,
}) as string;

try {
const columns = await getTableSchema(db, schema, table);

return columns.map((column) => ({
name: column.column_name,
value: column.column_name,
description: `Type: ${column.data_type.toUpperCase()}, Nullable: ${column.is_nullable}`,
}));
} finally {
if (!db.$pool.ending) await db.$pool.end();
}
const columns = await getTableSchema(db, schema, table);

return columns.map((column) => ({
name: column.column_name,
value: column.column_name,
description: `Type: ${column.data_type.toUpperCase()}, Nullable: ${column.is_nullable}`,
}));
}

export async function getColumnsMultiOptions(
Expand Down
56 changes: 26 additions & 30 deletions packages/nodes-base/nodes/Postgres/v2/methods/resourceMapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,34 +63,30 @@ export async function getMappingColumns(
extractValue: true,
}) as string;

try {
const columns = await getTableSchema(db, schema, table, { getColumnsForResourceMapper: true });
const unique = operation === 'upsert' ? await uniqueColumns(db, table, schema) : [];
const enumInfo = await getEnums(db);
const fields = await Promise.all(
columns.map(async (col) => {
const canBeUsedToMatch =
operation === 'upsert' ? unique.some((u) => u.attname === col.column_name) : true;
const type = mapPostgresType(col.data_type);
const options =
type === 'options' ? getEnumValues(enumInfo, col.udt_name as string) : undefined;
const hasDefault = Boolean(col.column_default);
const isGenerated = col.is_generated === 'ALWAYS' || col.identity_generation === 'ALWAYS';
const nullable = col.is_nullable === 'YES';
return {
id: col.column_name,
displayName: col.column_name,
required: !nullable && !hasDefault && !isGenerated,
defaultMatch: (col.column_name === 'id' && canBeUsedToMatch) || false,
display: true,
type,
canBeUsedToMatch,
options,
};
}),
);
return { fields };
} finally {
if (!db.$pool.ending) await db.$pool.end();
}
const columns = await getTableSchema(db, schema, table, { getColumnsForResourceMapper: true });
const unique = operation === 'upsert' ? await uniqueColumns(db, table, schema) : [];
const enumInfo = await getEnums(db);
const fields = await Promise.all(
columns.map(async (col) => {
const canBeUsedToMatch =
operation === 'upsert' ? unique.some((u) => u.attname === col.column_name) : true;
const type = mapPostgresType(col.data_type);
const options =
type === 'options' ? getEnumValues(enumInfo, col.udt_name as string) : undefined;
const hasDefault = Boolean(col.column_default);
const isGenerated = col.is_generated === 'ALWAYS' || col.identity_generation === 'ALWAYS';
const nullable = col.is_nullable === 'YES';
return {
id: col.column_name,
displayName: col.column_name,
required: !nullable && !hasDefault && !isGenerated,
defaultMatch: (col.column_name === 'id' && canBeUsedToMatch) || false,
display: true,
type,
canBeUsedToMatch,
options,
};
}),
);
return { fields };
}
Loading

0 comments on commit 2ca37f5

Please sign in to comment.