Skip to content
This repository has been archived by the owner on Feb 17, 2025. It is now read-only.

Commit

Permalink
feat: create pipe from recipe
Browse files Browse the repository at this point in the history
  • Loading branch information
ogp-weeloong committed Jan 22, 2024
1 parent b355249 commit 0752b23
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 0 deletions.
246 changes: 246 additions & 0 deletions packages/backend/src/controllers/recipes/create-pipe-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
import type { IJSONObject } from '@plumber/types'

import type { RequestHandler } from 'express'
import type { PartialModelObject } from 'objection'
import z from 'zod'

import apps from '@/apps'
import registerConnection from '@/graphql/mutations/register-connection'
import verifyConnection from '@/graphql/mutations/verify-connection'
import { getOrCreateUser } from '@/helpers/auth'
import { validateAndParseEmail } from '@/helpers/email-validator'
import type Step from '@/models/step'
import type User from '@/models/user'

const APP_KEYS: ReadonlySet<string> = new Set(
Object.values(apps).map((app) => app.key),
)

// Intentionally no validation
const jsonObjectSchema = z.custom<IJSONObject>()

const pipeStepSchema = z
.object({
type: z.enum(['trigger', 'action']),
app: z.string().refine(function (appKey) {
return APP_KEYS.has(appKey)
}),
event: z.string().min(1),
connection: z
.discriminatedUnion('command', [
z.object({ command: z.literal('create'), data: jsonObjectSchema }),
z.object({
command: z.literal('reuse-earlier-step'),
position: z.number().gt(1),
}),
])
.nullish(),
parameters: jsonObjectSchema,
})
// Some cursory validation...
.refine(function (schema) {
const selectedApp = apps[schema.app]

// Technically can compute trigger/action keys outside but lazy to do.
if (
schema.type === 'trigger' &&
!selectedApp.triggers.map((trigger) => trigger.key).includes(schema.event)
) {
return false
}

if (
schema.type === 'action' &&
!selectedApp.actions.map((action) => action.key).includes(schema.event)
) {
return false
}

// AuthUrl apps not supported for now
if (selectedApp.auth?.generateAuthUrl) {
return false
}

return true
})

const paramsSchema = z.object({
userEmail: z.string().email().min(1),
pipeName: z.string().min(1),
pipeSteps: z
.array(pipeStepSchema)
.min(2) // Need trigger + at least 1 action
.superRefine(function (steps, context) {
if (steps[0].type !== 'trigger') {
context.addIssue({
code: z.ZodIssueCode.custom,
message: 'First step must be trigger',
fatal: true,
})

return z.NEVER
}

for (const actionStep of steps.slice(1)) {
if (actionStep.type !== 'action') {
context.addIssue({
code: z.ZodIssueCode.custom,
message: 'Must only have action steps after trigger step',
fatal: true,
})
}
}
}),
})

async function createConnection(
user: User,
step: Step,
connectionConfig: IJSONObject,
): Promise<string | null> {
const app = apps[step.appKey]

// Should not happen but doesn't hurt.
if (!app.auth) {
return null
}

const connection = await user.$relatedQuery('connections').insert({
key: step.appKey,
formattedData: connectionConfig,
})

await verifyConnection(
null,
{
input: {
id: connection.id,
},
},
{
currentUser: user,
// any good for hackathon.
} as any,
)

// Registration needs connection IDs to be updated in steps, so might as well
// do it now. There's probably a better-optimized sequence to do connection
// creation + step creation + connection registration, but hackathon so wdv.
await step.$query().patch({
connectionId: connection.id,
})

// If needs registration, re-use our mutation to perform registration.
if (app.auth.connectionRegistrationType) {
await registerConnection(
null,
{
input: {
connectionId: connection.id,
stepId: step.id,
},
},
{
currentUser: user,
// any good for hackathon.
} as any,
)
}

return connection.id
}

async function setupStepConnections(
user: User,
stepsToSetup: Array<{
step: Step
connection: z.infer<typeof paramsSchema>['pipeSteps'][number]['connection']
}>,
): Promise<void> {
const connectionsThusFar: string[] = []

for (const stepToSetup of stepsToSetup) {
if (!stepToSetup.connection) {
await stepToSetup.step.$query().patch({
status: 'completed',
})
connectionsThusFar.push(null)
continue
}

// REFACTOR LATER. Not supposed to be serial await but wdv.
switch (stepToSetup.connection.command) {
case 'reuse-earlier-step': {
const connectionId =
connectionsThusFar[stepToSetup.connection.position - 1]
connectionsThusFar.push(connectionId)
break
}

case 'create': {
const connectionId = await createConnection(
user,
stepToSetup.step,
stepToSetup.connection.data,
)
connectionsThusFar.push(connectionId)
break
}
}

await stepToSetup.step.$query().patch({
status: 'completed',
})
}
}

const createPipeHandler: RequestHandler = async function (req, res) {
const params = paramsSchema.parse(req.body)

// Create user if doesn't already exist.
const email = await validateAndParseEmail(params.userEmail)
if (!email) {
throw new Error('Email is invalid or not whitelisted.')
}
const user = await getOrCreateUser(email)

//
// Setup pipe.
//
// In theory we could re-use our GraphQL but it's more efficient to skip all
// that validation.

const pipe = await user.$relatedQuery('flows').insert({
name: params.pipeName,
})

// Need to create steps first since some connections need step ID.
const steps = await pipe.$relatedQuery('steps').insert(
params.pipeSteps.map(
(step, index): PartialModelObject<Step> => ({
type: step.type,
position: index + 1,
appKey: step.app,
key: step.event,
parameters: step.parameters,
}),
),
)

// Create connections where needed.
await setupStepConnections(
user,
// Ghetto zip
params.pipeSteps.map((s, index) => ({
step: steps[index],
connection: s.connection,
})),
)

// Publish!
await pipe.$query().patch({ active: true })

res.json({ pipeId: pipe.id })
}

export default createPipeHandler
3 changes: 3 additions & 0 deletions packages/backend/src/controllers/recipes/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import createPipeHandler from './create-pipe-handler'

export { createPipeHandler }
2 changes: 2 additions & 0 deletions packages/backend/src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import { Router } from 'express'

import graphQLInstance from '@/helpers/graphql-instance'

import recipesRouter from './recipes'
import webhooksRouter from './webhooks'

const router = Router()

router.use('/graphql', graphQLInstance)
router.use('/webhooks', webhooksRouter)
router.use('/recipes', recipesRouter)

export default router
32 changes: 32 additions & 0 deletions packages/backend/src/routes/recipes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { IRequest } from '@plumber/types'

import { NextFunction, RequestHandler, Response, Router } from 'express'
import multer from 'multer'

import { createPipeHandler } from '@/controllers/recipes'
import logger from '@/helpers/logger'

const router = Router()
const upload = multer()

router.use(upload.none())

function exposeError(handler: RequestHandler) {
return async (req: IRequest, res: Response, next: NextFunction) => {
try {
logger.http({
webhookUrl: req.url,
body: req.body,
headers: req.headers,
})
await handler(req, res, next)
} catch (err) {
logger.error(err)
next(err)
}
}
}

router.post('/createPipe', exposeError(createPipeHandler))

export default router

0 comments on commit 0752b23

Please sign in to comment.