forked from PipedreamHQ/pipedream
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Salesforce timer-based event sources
* Refactor Salesforce event sources for every SObject types to be timer-based * Remove everything related to the SOAP API and Apex, and rely solely on the REST API `getUpdated` and `getDeleted` endpoints
- Loading branch information
Showing
5 changed files
with
224 additions
and
397 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,152 +1,81 @@ | ||
const slugify = require("slugify"); | ||
const { v4: uuidv4 } = require("uuid"); | ||
|
||
const salesforce = require("./salesforce.app"); | ||
|
||
module.exports = { | ||
dedupe: "unique", | ||
props: { | ||
db: "$.service.db", | ||
http: { | ||
type: "$.interface.http", | ||
customResponse: true, | ||
}, | ||
salesforce, | ||
objectType: { propDefinition: [salesforce, "objectType"] }, | ||
timer: { | ||
type: '$.interface.timer', | ||
default: { | ||
intervalSeconds: 60 * 15, // 15 minutes | ||
}, | ||
}, | ||
objectType: { | ||
type: "string", | ||
label: "Object Type", | ||
description: "The type of object for which to monitor events", | ||
async options(context) { | ||
const { page } = context; | ||
if (page !== 0) { | ||
return { | ||
options: [], | ||
}; | ||
} | ||
|
||
const { sobjects } = await this.salesforce.listSObjectTypes(); | ||
const options = sobjects | ||
.filter(this.isValidSObject) | ||
.map(sobject => ({ | ||
label: sobject.label, | ||
value: sobject.name, | ||
})); | ||
return { | ||
options, | ||
}; | ||
}, | ||
}, | ||
}, | ||
hooks: { | ||
async activate() { | ||
// Retrieve base API URL's for this account | ||
const apiUrls = await this.salesforce.getApiUrls(); | ||
this.db.set("apiUrls", apiUrls); | ||
|
||
// Enable this Salesforce account to make HTTP | ||
// calls to this event source's endpoint. | ||
// API docs: https://sforce.co/3jAeV0G | ||
const { metadata: metadataApiUrl } = apiUrls; | ||
const remoteSiteName = this._getRemoteSiteName(); | ||
const { endpoint: endpointUrl } = this.http; | ||
await this.salesforce.createRemoteSite(metadataApiUrl, remoteSiteName, endpointUrl); | ||
this.db.set("remoteSiteName", remoteSiteName); | ||
const latestDateCovered = this.db.get("latestDateCovered"); | ||
if (!latestDateCovered) { | ||
const now = new Date().toISOString(); | ||
this.db.set("latestDateCovered", now); | ||
} | ||
|
||
// Create a custom Apex class to handle the HTTP | ||
// request logic of the webhook. | ||
const secretToken = uuidv4(); | ||
const webhookName = this._getWebhookName(); | ||
const webhookBody = this.getWebhookBody(webhookName, secretToken); | ||
const { id: webhookId } = await this.salesforce.createApexClass(webhookName, webhookBody); | ||
this.db.set("secretToken", secretToken); | ||
this.db.set("webhookId", webhookId); | ||
|
||
// Create a custom Apex trigger to listen to | ||
// the relevant events and dispatch the webhook | ||
// call(s) accordingly. | ||
const triggerName = this._getTriggerName(); | ||
const triggerBody = this.getTriggerBody(triggerName, webhookName); | ||
const objectType = this.getObjectType(); | ||
const { id: triggerId } = | ||
await this.salesforce.createApexTrigger(triggerName, triggerBody, objectType); | ||
this.db.set("triggerId", triggerId); | ||
}, | ||
async deactivate() { | ||
const triggerId = this.db.get("triggerId"); | ||
await this.salesforce.deleteApexTrigger(triggerId); | ||
|
||
const webhookId = this.db.get("webhookId"); | ||
await this.salesforce.deleteApexClass(webhookId); | ||
|
||
const { metadata: metadataApiUrl } = this.db.get("apiUrls"); | ||
const remoteSiteName = this.db.get("remoteSiteName"); | ||
await this.salesforce.deleteRemoteSite(metadataApiUrl, remoteSiteName); | ||
const nameField = await this.salesforce.getNameFieldForObjectType(this.objectType); | ||
this.db.set("nameField", nameField); | ||
}, | ||
}, | ||
methods: { | ||
_getRemoteSiteName() { | ||
const eventSourceName = this.getEventSourceName(); | ||
return this.standardizeName(eventSourceName); | ||
}, | ||
_getTriggerName() { | ||
return this.standardizeName("trigger"); | ||
}, | ||
_getWebhookName() { | ||
return this.standardizeName("webhook"); | ||
}, | ||
_isValidSource(event) { | ||
const { | ||
"x-webhook-token": webhookToken, | ||
} = event.headers; | ||
const secretToken = this.db.get("secretToken"); | ||
return webhookToken === secretToken; | ||
isValidSObject(sobject) { | ||
// Only the activity of those SObject types that have the `replicateable` | ||
// flag set is published via the `getUpdated` API. | ||
// | ||
// See the API docs here: https://sforce.co/3gDy3uP | ||
return sobject.replicateable; | ||
}, | ||
standardizeName(rawName) { | ||
const options = { | ||
replacement: "_", | ||
remove: /[()-]/g, | ||
lower: true, | ||
}; | ||
const id = uuidv4(); | ||
const enrichedRawName = `pd_${rawName}_${id}`; | ||
return slugify(enrichedRawName, options).substring(0, 40); | ||
}, | ||
processEvent(event) { | ||
const { body } = event; | ||
const meta = this.generateMeta(event); | ||
this.$emit(body, meta); | ||
}, | ||
generateMeta() { | ||
throw new Error('generateMeta is not implemented'); | ||
}, | ||
getEventSourceName() { | ||
throw new Error('getEventSourceName is not implemented'); | ||
}, | ||
getEventTypes() { | ||
throw new Error('getEventTypes is not implemented'); | ||
}, | ||
getObjectType() { | ||
throw new Error('getObjectType is not implemented'); | ||
}, | ||
getTriggerBody() { | ||
throw new Error('getTriggerBody is not implemented'); | ||
}, | ||
getWebhookBody(webhookName, secretToken) { | ||
return ` | ||
public abstract class ${webhookName} { | ||
public static String jsonContent(final Map<String, Object> eventContent) { | ||
final Map<String, Object> content = new Map<String, Object>(eventContent); | ||
content.put('UserId', UserInfo.getUserId()); | ||
return JSON.serialize(content); | ||
} | ||
@future(callout=true) | ||
public static void callout(final String url, final String content) { | ||
final HttpRequest request = new HttpRequest(); | ||
request.setEndpoint(url); | ||
request.setMethod('POST'); | ||
request.setHeader('Content-Type', 'application/json'); | ||
request.setHeader('X-Webhook-Token', '${secretToken}'); | ||
request.setBody(content); | ||
final Http http = new Http(); | ||
http.send(request); | ||
} | ||
} | ||
`; | ||
processEvent() { | ||
throw new Error("processEvent is not implemented"); | ||
}, | ||
}, | ||
async run(event) { | ||
if (!this._isValidSource(event)) { | ||
this.http.respond({ | ||
statusCode: 404, | ||
}); | ||
const startTimestamp = this.db.get("latestDateCovered"); | ||
const endTimestamp = new Date(event.timestamp * 1000).toISOString(); | ||
const timeDiffSec = Math.floor( | ||
(Date.parse(endTimestamp) - Date.parse(startTimestamp)) / 1000 | ||
); | ||
if (timeDiffSec < 60) { | ||
console.log(` | ||
Skipping execution since the last one happened approximately ${timeDiffSec} seconds ago | ||
`); | ||
return; | ||
} | ||
|
||
this.http.respond({ | ||
statusCode: 200, | ||
await this.processEvent({ | ||
startTimestamp, | ||
endTimestamp, | ||
}); | ||
|
||
await this.processEvent(event); | ||
}, | ||
}; |
Oops, something went wrong.