Skip to content

Commit

Permalink
progress on Teller ETL service
Browse files Browse the repository at this point in the history
  • Loading branch information
tmyracle committed Jan 15, 2024
1 parent a777ef0 commit d2a36b1
Show file tree
Hide file tree
Showing 21 changed files with 583 additions and 27 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ NX_FINICITY_PARTNER_SECRET=
# Teller API keys (https://teller.io)
NX_TELLER_SIGNING_SECRET=
NX_TELLER_APP_ID=
NX_TELLER_ENV=sandbox

# Email credentials
NX_POSTMARK_FROM_ADDRESS=[email protected]
Expand Down
6 changes: 5 additions & 1 deletion apps/server/src/app/lib/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
FinicityWebhookHandler,
PlaidWebhookHandler,
TellerService,
TellerETL,
TellerWebhookHandler,
InsightService,
SecurityPricingService,
Expand Down Expand Up @@ -149,15 +150,18 @@ const tellerService = new TellerService(
logger.child({ service: 'TellerService' }),
prisma,
teller,
new TellerETL(logger.child({ service: 'TellerETL' }), prisma, teller),
cryptoService,
getTellerWebhookUrl(),
true
env.NX_TELLER_ENV === 'sandbox'
)

// account-connection

const accountConnectionProviderFactory = new AccountConnectionProviderFactory({
plaid: plaidService,
finicity: finicityService,
teller: tellerService,
})

const transactionStrategy = new TransactionBalanceSyncStrategy(
Expand Down
1 change: 1 addition & 0 deletions apps/server/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const envSchema = z.object({

NX_TELLER_SIGNING_SECRET: z.string().default('REPLACE_THIS'),
NX_TELLER_APP_ID: z.string().default('REPLACE_THIS'),
NX_TELLER_ENV: z.string().default('sandbox'),

NX_SENTRY_DSN: z.string().optional(),
NX_SENTRY_ENV: z.string().optional(),
Expand Down
1 change: 1 addition & 0 deletions libs/server/features/src/providers/teller/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './teller.webhook'
export * from './teller.service'
export * from './teller.etl'
267 changes: 267 additions & 0 deletions libs/server/features/src/providers/teller/teller.etl.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
import type { AccountConnection, PrismaClient } from '@prisma/client'
import type { Logger } from 'winston'
import { SharedUtil, AccountUtil, type SharedType } from '@maybe-finance/shared'
import type { FinicityApi, FinicityTypes } from '@maybe-finance/finicity-api'
import type { TellerApi, TellerTypes } from '@maybe-finance/teller-api'
import { DbUtil, TellerUtil, type IETL } from '@maybe-finance/server/shared'
import { Prisma } from '@prisma/client'
import _ from 'lodash'
import { DateTime } from 'luxon'

export type TellerRawData = {
accounts: TellerTypes.Account[]
transactions: TellerTypes.Transaction[]
transactionsDateRange: SharedType.DateRange<DateTime>
}

export type TellerData = {
accounts: TellerTypes.Account[]
transactions: TellerTypes.Transaction[]
transactionsDateRange: SharedType.DateRange<DateTime>
}

type Connection = Pick<AccountConnection, 'id' | 'userId' | 'tellerInstitutionId'>

export class TellerETL implements IETL<Connection, TellerRawData, TellerData> {
public constructor(
private readonly logger: Logger,
private readonly prisma: PrismaClient,
private readonly teller: Pick<TellerApi, 'getAccounts' | 'getTransactions'>
) {}

async extract(connection: Connection): Promise<TellerRawData> {
if (!connection.tellerInstitutionId) {
throw new Error(`connection ${connection.id} is missing tellerInstitutionId`)
}

const user = await this.prisma.user.findUniqueOrThrow({
where: { id: connection.userId },
select: {
id: true,
tellerUserId: true,
},
})

if (!user.tellerUserId) {
throw new Error(`user ${user.id} is missing tellerUserId`)
}

// TODO: Check if Teller supports date ranges for transactions
const transactionsDateRange = {
start: DateTime.now().minus(TellerUtil.TELLER_WINDOW_MAX),
end: DateTime.now(),
}

const accounts = await this._extractAccounts(user.tellerUserId)

const transactions = await this._extractTransactions(
user.tellerUserId,
accounts.map((a) => a.id),
transactionsDateRange
)

this.logger.info(
`Extracted Teller data for customer ${user.tellerUserId} accounts=${accounts.length} transactions=${transactions.length}`,
{ connection: connection.id, transactionsDateRange }
)

return {
accounts,
transactions,
transactionsDateRange,
}
}

async transform(_connection: Connection, data: TellerData): Promise<TellerData> {
return {
...data,
}
}

async load(connection: Connection, data: TellerData): Promise<void> {
await this.prisma.$transaction([
...this._loadAccounts(connection, data),
...this._loadTransactions(connection, data),
])

this.logger.info(`Loaded Teller data for connection ${connection.id}`, {
connection: connection.id,
})
}

private async _extractAccounts(tellerUserId: string) {
const { accounts } = await this.teller.getAccounts({ accessToken: undefined })

return accounts.filter(
(a) => a.institutionLoginId.toString() === institutionLoginId && a.currency === 'USD'
)
}

private _loadAccounts(connection: Connection, { accounts }: Pick<TellerData, 'accounts'>) {
return [
// upsert accounts
...accounts.map((tellerAccount) => {
return this.prisma.account.upsert({
where: {
accountConnectionId_tellerAccountId: {
accountConnectionId: connection.id,
tellerAccountId: tellerAccount.id,
},
},
create: {
type: TellerUtil.getType(tellerAccount.type),
provider: 'teller',
categoryProvider: PlaidUtil.plaidTypesToCategory(plaidAccount.type),
subcategoryProvider: plaidAccount.subtype ?? 'other',
accountConnectionId: connection.id,
plaidAccountId: plaidAccount.account_id,
name: tellerAccount.name,
plaidType: tellerAccount.type,
plaidSubtype: tellerAccount.subtype,
mask: plaidAccount.mask,
...PlaidUtil.getAccountBalanceData(
plaidAccount.balances,
plaidAccount.type
),
},
update: {
type: TellerUtil.getType(tellerAccount.type),
categoryProvider: PlaidUtil.plaidTypesToCategory(tellerAccount.type),
subcategoryProvider: tellerAccount.subtype ?? 'other',
plaidType: tellerAccount.type,
plaidSubtype: tellerAccount.subtype,
..._.omit(
PlaidUtil.getAccountBalanceData(
plaidAccount.balances,
plaidAccount.type
),
['currentBalanceStrategy', 'availableBalanceStrategy']
),
},
})
}),
// any accounts that are no longer in Plaid should be marked inactive
this.prisma.account.updateMany({
where: {
accountConnectionId: connection.id,
AND: [
{ tellerAccountId: { not: null } },
{ tellerAccountId: { notIn: accounts.map((a) => a.id) } },
],
},
data: {
isActive: false,
},
}),
]
}

private async _extractTransactions(
customerId: string,
accountIds: string[],
dateRange: SharedType.DateRange<DateTime>
) {
const accountTransactions = await Promise.all(
accountIds.map((accountId) =>
SharedUtil.paginate({
pageSize: 1000, // https://api-reference.finicity.com/#/rest/api-endpoints/transactions/get-customer-account-transactions
fetchData: async (offset, count) => {
const transactions = await SharedUtil.withRetry(
() =>
this.teller.getTransactions({
accountId,
accessToken: undefined,
fromDate: dateRange.start.toUnixInteger(),
toDate: dateRange.end.toUnixInteger(),
start: offset + 1,
limit: count,
}),
{
maxRetries: 3,
}
)

return transactions
},
})
)
)

return accountTransactions.flat()
}

private _loadTransactions(
connection: Connection,
{
transactions,
transactionsDateRange,
}: Pick<TellerData, 'transactions' | 'transactionsDateRange'>
) {
if (!transactions.length) return []

const txnUpsertQueries = _.chunk(transactions, 1_000).map((chunk) => {
return this.prisma.$executeRaw`
INSERT INTO transaction (account_id, teller_transaction_id, date, name, amount, pending, currency_code, merchant_name, teller_type, teller_category)
VALUES
${Prisma.join(
chunk.map((tellerTransaction) => {
const {
id,
account_id,
description,
amount,
status,
type,
details,
date,
} = tellerTransaction
return Prisma.sql`(
(SELECT id FROM account WHERE account_connection_id = ${
connection.id
} AND teller_account_id = ${account_id.toString()}),
${id},
${date}::date,
${[description].filter(Boolean).join(' ')},
${DbUtil.toDecimal(-amount)},
${status === 'pending'},
${'USD'},
${details.counterparty.name ?? ''},
${type},
${details.category ?? ''},
)`
})
)}
ON CONFLICT (teller_transaction_id) DO UPDATE
SET
name = EXCLUDED.name,
amount = EXCLUDED.amount,
pending = EXCLUDED.pending,
merchant_name = EXCLUDED.merchant_name,
teller_type = EXCLUDED.teller_type,
teller_category = EXCLUDED.teller_category;
`
})

return [
// upsert transactions
...txnUpsertQueries,
// delete teller-specific transactions that are no longer in teller
this.prisma.transaction.deleteMany({
where: {
account: {
accountConnectionId: connection.id,
},
AND: [
{ tellerTransactionId: { not: null } },
{ tellerTransactionId: { notIn: transactions.map((t) => `${t.id}`) } },
],
date: {
gte: transactionsDateRange.start.startOf('day').toJSDate(),
lte: transactionsDateRange.end.endOf('day').toJSDate(),
},
},
}),
]
}
}
Loading

0 comments on commit d2a36b1

Please sign in to comment.