Skip to content

Commit

Permalink
adding internal api code for pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
dalehille committed Jan 29, 2020
1 parent 5a4c45f commit 28f3359
Show file tree
Hide file tree
Showing 11 changed files with 525 additions and 3 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ Razeedash-API is the interface used by
| S3_ACCESS_KEY_ID | if S3_ENDPOINT defined | n/a |
| S3_SECRET_ACCESS_KEY | if S3_ENDPOINT defined | n/a |
| S3_LOCATION_CONSTRAINT | no | 'us-standard'|
| S3_BUCKET_PREFIX | no | 'razee'|
| ORG_ADMIN_KEY | no | n/a |
| ADD_CLUSTER_WEBHOOK_URL | no | n/a |

If S3_ENDPOINT is defined then encrypted cluster YAML is stored in S3 otherwise
it will be stored in the mongoDB.

If S3_BUCKET_PREFIX is not defined then the s3 bucket will be named `razee-<your_razee_org_id>`

ORG_ADMIN_KEY is required if you plan on adding organizations using the api/v2/orgs endpoint

ADD_CLUSTER_WEBHOOK_URL signifies the webhook endpoint to hit when a cluster is added.
Expand Down
1 change: 1 addition & 0 deletions app/conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const conf = {
accessKeyId: process.env.S3_ACCESS_KEY_ID,
secretAccessKey: process.env.S3_SECRET_ACCESS_KEY,
locationConstraint: process.env.S3_LOCATION_CONSTRAINT || 'us-standard',
bucketPrefix: process.env.S3_BUCKET_PREFIX || 'razee'
}
};

Expand Down
6 changes: 4 additions & 2 deletions app/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const compression = require('compression');
const body_parser = require('body-parser');
const ebl = require('express-bunyan-logger');
const addRequestId = require('express-request-id')();
const {router, initialize} = require('./routes/index.js');
const {router, initialize, streamedRoutes} = require('./routes/index.js');
const log = require('./log').log;
const getBunyanConfig = require('./utils/bunyan.js').getBunyanConfig;
const port = 3333;
Expand All @@ -35,9 +35,11 @@ router.use(ebl(getBunyanConfig('razeedash-api')));

app.set('trust proxy', true);
app.use(addRequestId);
app.use(compression());

app.use(streamedRoutes); // routes where we don't wan't body-parser applied
app.use(body_parser.json({ limit: '8mb' }));
app.use(body_parser.urlencoded({ extended: false }));
app.use(compression());
app.set('port', port);
app.use(router);

Expand Down
11 changes: 11 additions & 0 deletions app/routes/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

const express = require('express');
const router = express.Router();
const streamedRoutes = express.Router();
const asyncHandler = require('express-async-handler');

const getBunyanConfig = require('../utils/bunyan.js').getBunyanConfig;
Expand All @@ -37,6 +38,9 @@ const Install = require('./install');
const Clusters = require('./v2/clusters.js');
const Resources = require('./v2/resources.js');
const Orgs = require('./v2/orgs.js');
const Channels = require('./v2/channels.js');
const ChannelsStream = require('./v2/channelsStream.js');
const Subscriptions = require('./v2/subscriptions.js');

router.use('/api/kube', Kube);
router.use(ebl(getBunyanConfig('/api/v2/')));
Expand Down Expand Up @@ -79,6 +83,13 @@ router.use('/api/install', Install);
router.use('/api/v2/clusters', Clusters);
router.use('/api/v2/resources', Resources);

router.use('/api/v1/channels', Channels);
// streamedRoutes is defined so we can have a path that isn't affected by body-parser.
// The POST route in ChannelsStream uses streams to upload to s3 and this would break
// if we applied the body-parser middelware on this route.
streamedRoutes.use('/api/v1/channels', ChannelsStream);
router.use('/api/v1/subscriptions', Subscriptions);

async function initialize(){
const options = {
'collection-indexes': {
Expand Down
165 changes: 165 additions & 0 deletions app/routes/v2/channels.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
const express = require('express');
const router = express.Router();
const asyncHandler = require('express-async-handler');
const ebl = require('express-bunyan-logger');
const getBunyanConfig = require('../../utils/bunyan.js').getBunyanConfig;
const mongoConf = require('../../conf.js').conf;
const MongoClientClass = require('../../mongo/mongoClient.js');
const MongoClient = new MongoClientClass(mongoConf);
const conf = require('../../conf.js').conf;
const S3ClientClass = require('../../s3/s3Client');
const uuid = require('uuid/v4');
const url = require('url');
const crypto = require('crypto');
const tokenCrypt = require('../../utils/crypt');
const algorithm = 'aes-256-cbc';

const getOrg = require('../../utils/orgs.js').getOrg;
const requireAuth = require('../../utils/api_utils.js').requireAuth;

router.use(ebl(getBunyanConfig('razee-api/v1Channels')));

router.use(asyncHandler(async (req, res, next) => {
req.db = await MongoClient.getClient();
next();
}));

// get all channels for an org
// curl --request GET \
// --url http://localhost:3333/api/v1/channels \
// --header 'razee-org-key: orgApiKey-api-key-goes-here'
router.get('/', getOrg, requireAuth, asyncHandler(async(req, res)=>{
try {
const orgId = req.org._id;
const Channels = req.db.collection('channels');
const channels = await Channels.find({ org_id: orgId }).toArray();
res.status(200).json({status: 'success', channels: channels});
} catch (error) {
req.log.error(error);
return res.status(500).json({ status: 'error', message: error});
}
}));

// create a new channel
// curl --request POST \
// --url http://localhost:3333/api/v1/channels\
// --header 'content-type: application/json' \
// --header 'razee-org-key: orgApiKey-api-key-goes-here' \
// --data '{"name": "channel-name-here"}'
router.post('/', getOrg, requireAuth, asyncHandler(async(req, res, next)=>{
try {
const orgId = req.org._id;
const newDeployable = req.body.name;

const Channels = req.db.collection('channels');
const nameAlreadyExists = await Channels.find({
org_id: orgId,
name: newDeployable
}).count();

if(nameAlreadyExists) {
res.status(403).json({ status: 'error', message: 'This deployable name already exists' });
} else {
const deployableId = uuid();
let resp = await Channels.insertOne({ 'org_id': orgId, 'name': newDeployable, 'uuid': deployableId, 'created': new Date(), 'versions': []});
if(resp.insertedCount == 1) {
res.status(200).json({ status: 'success', id: deployableId, 'name': newDeployable });
} else {
res.status(403).json({ status: 'error', message: 'Error inserting a new deployable'});
}
}
} catch (error) {
req.log.error(error);
next(error);
}
}));

// Get yaml for a channel. Retrieves this data either from mongo or from COS
// curl --request GET \
// --url http://localhost:3333/api/v1/channels/:channelName/:versionId \
// --header 'razee-org-key: orgApiKey-api-key-goes-here' \
router.get('/:channelName/:versionId', getOrg, asyncHandler(async(req, res, next)=>{
var orgId = req.org._id;
var channelName = req.params.channelName + '';
var versionId = req.params.versionId + '';
var Channels = req.db.collection('channels');
var DeployableVersions = req.db.collection('deployableVersions');

var deployable = await Channels.findOne({ org_id: orgId, name: channelName});
if(!deployable){
res.status(404).send({status: 'error', message: `channel "${channelName}" not found for this org`});
return;
}

var deployableVersion = await DeployableVersions.findOne({ org_id: orgId, channel_id: deployable.uuid, uuid: versionId });
if(!deployableVersion){
res.status(404).send({status: 'error', message: `versionId "${versionId}" not found`});
return;
}

if(deployableVersion.location === 's3') {
if (conf.s3.endpoint) {
try {
const s3Client = new S3ClientClass(conf);
const link = url.parse(deployableVersion.content);
const iv = Buffer.from(deployableVersion.iv, 'base64');
const paths = link.path.split('/');
const bucket = paths[1];
const resourceName = decodeURI(paths[2]);
const key = Buffer.concat([Buffer.from(req.org.apiKey)], 32);
const decipher = crypto.createDecipheriv(algorithm, key, iv);
const s3stream = s3Client.getObject(bucket, resourceName).createReadStream();
s3stream.on('error', function(error) {
req.log.error(error);
return res.status(403).json({ status: 'error', message: error.message});
});
s3stream.pipe(decipher).pipe(res);
s3stream.on('httpError', (error) => {
req.log.error(error, 'Error GETting data using the S3 client');
if (!res.headersSent) {
res.status(error.statusCode || 500).json(error);
} else {
next(error);
}
});
} catch (error) {
return res.status(403).json({ status: 'error', message: error.message});
}
} else {
return res.status(403).json({ status: 'error', message: 'An endpoint must be configured for the S3 client'});
}
} else {
// in this case the resource was stored directly in mongo rather than in COS
try {
const data = tokenCrypt.decrypt(deployableVersion.content, req.org.apiKey);
res.set('Content-Type', deployableVersion.type);
res.status(200).send(data);
} catch (error) {
return res.status(500).json({ status: 'error', message: error });
}
}
}));

// Get an individual channel object
// curl --request GET \
// --url http://localhost:3333/api/v1/channels/:channelName \
// --header 'razee-org-key: orgApiKey-api-key-goes-here' \
router.get('/:channelName', getOrg, requireAuth, asyncHandler(async(req, res)=>{
const orgId = req.org._id;
const channelName = req.params.channelName + '';

try {
const Channels = req.db.collection('channels');
const channel = await Channels.findOne({ org_id: orgId, name: channelName});
if(!channel){
res.status(404).send({status: 'error', message: `channel ${channelName} not found for this org`});
return;
} else {
return res.status(200).send({status: 'success', channel: channel});
}
} catch (error) {
return res.status(500).send({status: 'error', message: error});
}
}));

export { router as Channels };
150 changes: 150 additions & 0 deletions app/routes/v2/channelsStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
const express = require('express');
const router = express.Router();
const asyncHandler = require('express-async-handler');
const ebl = require('express-bunyan-logger');
const getBunyanConfig = require('../../utils/bunyan.js').getBunyanConfig;
const mongoConf = require('../../conf.js').conf;
const MongoClientClass = require('../../mongo/mongoClient.js');
const MongoClient = new MongoClientClass(mongoConf);
const conf = require('../../conf.js').conf;
const uuid = require('uuid/v4');
const S3ClientClass = require('../../s3/s3Client');
const AWS = require('aws-sdk');
const crypto = require('crypto');
const algorithm = 'aes-256-cbc';

const getOrg = require('../../utils/orgs.js').getOrg;
const requireAuth = require('../../utils/api_utils.js').requireAuth;
const encryptResource = require('../../utils/api_utils.js').encryptResource;

router.use(ebl(getBunyanConfig('razee-api/v1Channels')));

router.use(asyncHandler(async (req, res, next) => {
req.db = await MongoClient.getClient();
next();
}));

// Create a new resource version for a channel. This route was created separate from
// channels.js so we can have a route in src/server.js where body-parser isn't applied
// curl --request POST \
// --url http://localhost:3333/api/v1/channels/:channelName/version \
// --header 'content-type: [application/json | application/yaml]' \
// --header 'razee-org-key: orgApiKey-api-key-goes-here' \
// --header 'resource-name: name-of-the-new-resource-version' \
// --header 'resource-description: optional-description-of-the-new-resource-version' \
// --header 'x-api-key: razee-user-api-key' \
// --header 'x-user-id: razee-user-id' \
// --data @filename.goes.here.yaml
router.post('/:channelName/version', getOrg, requireAuth, asyncHandler(async(req, res)=>{
try {
if (!req.get('resource-name')) {
return res.status(400).send('A resource-name name was not included in the header');
}

if (!req.get('content-type')) {
return res.status(400).send('A Content-Type header of application/json or application/yaml must be included');
}

const version = {
description: req.get('resource-description'),
name: req.get('resource-name'),
type: req.get('content-type')
};

version.uuid = uuid();

if (!req.params.channelName) {
return res.status(400).send('A channel name field was not included in the POST request');
}

const orgId = req.org._id;
const channelName = req.params.channelName + '';
const Channels = req.db.collection('channels');
const DeployableVersions = req.db.collection('deployableVersions');
const existingChannel = await Channels.findOne({
org_id: orgId,
name: channelName
});

if(existingChannel) {
const versions = await DeployableVersions.find({channel_name: existingChannel.name}).toArray();
const versionNameExists = versions.filter( (existingVersion) => existingVersion.name === version.name );

if(versionNameExists && versionNameExists.length > 0) {
return res.status(403).json({ status: 'error', message: `The version name ${version.name} already exists`});
}

let location, data;
const iv = crypto.randomBytes(16);
const ivText = iv.toString('base64');

if (conf.s3.endpoint) {
try {
const resourceName = existingChannel.name + '-' + version.name;
const bucket = `${conf.s3.bucketPrefix}-${orgId.toLowerCase()}`;
const s3Client = new S3ClientClass(conf);
try {
const exists = await s3Client.bucketExists(bucket);
if (!exists) {
req.log.warn({ bucket: bucket }, 'bucket does not exist');
await s3Client.createBucket(bucket);
}
} catch (error) {
req.log.error({ bucket: bucket }, 'could not create bucket');
return res.status(500).json({ status: 'error', message: error.message});
}
const s3 = new AWS.S3(conf.s3);
const key = Buffer.concat([Buffer.from(req.org.apiKey)], 32);
const encrypt = crypto.createCipheriv(algorithm, key, iv);
const pipe = req.pipe(encrypt);
const params = {Bucket: bucket, Key: resourceName, Body: pipe};
const upload = s3.upload( params );
await upload.promise();

data = `https://${conf.s3.endpoint}/${bucket}/${resourceName}`;
location = 's3';
} catch (error) {
req.log.error( 'S3 upload error', error );
return res.status(403).json({ status: 'error', message: error.message});
}
} else {
data = await encryptResource(req);
location = 'mongo';
}

await DeployableVersions.insertOne({
'org_id': orgId,
'channel_id': existingChannel.uuid,
'channel_name': existingChannel.name,
'name': version.name,
'description': version.description,
'uuid': version.uuid,
'content': data,
'iv': ivText,
'location': location,
'type': version.type,
'created': new Date()
});

const versionObj = {
'uuid': version.uuid,
'name': version.name,
'description': version.description,
'location': location
};

await Channels.updateOne(
{ org_id: orgId, uuid: existingChannel.uuid },
{ $push: { versions: versionObj } }
);
return res.status(200).json({ status: 'success', version: versionObj});
} else {
return res.status(404).json({ status: 'error', message: 'This channel was not found'});
}
} catch (error) {
req.log.info( error.stack );
return res.status(500).json({ status: 'error', message: error});
}
}));

export { router as ChannelsStream };
Loading

0 comments on commit 28f3359

Please sign in to comment.