Skip to content

Commit

Permalink
Merge pull request #52 from topcoder-platform/PLAT-3506
Browse files Browse the repository at this point in the history
feat: process login events
  • Loading branch information
eisbilir authored Oct 3, 2023
2 parents 80c473f + 2f6608a commit ecb3146
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 2 deletions.
1 change: 1 addition & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module.exports = {
// Kafka topics related to Creation and Update of User
USER_CREATE_TOPIC: process.env.USER_CREATE_TOPIC || 'identity.notification.create',
USER_UPDATE_TOPIC: process.env.USER_UPDATE_TOPIC || 'identity.notification.update',
USER_LOGIN_TOPIC: process.env.USER_LOGIN_TOPIC || 'member.action.login',

// Kafka output topics to be consumed by member-processor-es to save member profile data in Elasticsearch
USER_CREATE_OUTPUT_TOPIC: process.env.USER_CREATE_OUTPUT_TOPIC || 'member.action.profile.create',
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,8 @@
"env": [
"mocha"
]
},
"volta": {
"node": "8.11.3"
}
}
5 changes: 4 additions & 1 deletion src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ const dataHandler = async (messageSet, topic, partition) => {
logger.info('Ignore message.')
}
break
case config.USER_LOGIN_TOPIC:
await ProcessorService.processUserLogin(messageJSON, producer)
break
default:
throw new Error(`Invalid topic: ${topic}`)
}
Expand Down Expand Up @@ -103,7 +106,7 @@ producer
logger.info('Starting kafka consumer')
return consumer
.init([{
subscriptions: [config.USER_CREATE_TOPIC, config.USER_UPDATE_TOPIC],
subscriptions: [config.USER_CREATE_TOPIC, config.USER_UPDATE_TOPIC, config.USER_LOGIN_TOPIC],
handler: dataHandler
}])
})
Expand Down
54 changes: 53 additions & 1 deletion src/services/ProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,61 @@ async function processUpdateUser (message, producer) {

processUpdateUser.schema = processCreateUser.schema

/**
* Process the User login event
* @param {Object} message the Kafka message in JSON format
* @param {Object} producer the Kafka producer
*/
async function processUserLogin (message, producer) {
const member = message.payload
const record = {
TableName: config.AMAZON_AWS_DYNAMODB_MEMBER_PROFILE_TABLE,
Key: {
userId: member.userId
},
UpdateExpression: `set lastLoginDate = :lastLoginDate`,
ExpressionAttributeValues: {
':lastLoginDate': member.lastLoginDate,
},
}
if (member.loginCount) {
record['UpdateExpression'] = record['UpdateExpression'] + `, loginCount = :loginCount`
record['ExpressionAttributeValues'][':loginCount'] = member.loginCount
}
await helper.updateRecord(record)
logger.info('DynamoDB record is updated successfully.')

// send output message to Kafka
const outputMessage = {
topic: config.USER_UPDATE_OUTPUT_TOPIC,
originator: config.OUTPUT_MESSAGE_ORIGINATOR,
timestamp: new Date().toISOString(),
'mime-type': 'application/json',
payload: member
}
await producer.send({ topic: outputMessage.topic, message: { value: JSON.stringify(outputMessage) } })
logger.info(`Member profile update message is successfully sent to Kafka topic ${outputMessage.topic}`)
}

processUserLogin.schema = {
message: joi.object().keys({
topic: joi.string().required(),
originator: joi.string().required(),
timestamp: joi.date().required(),
'mime-type': joi.string().required(),
payload: joi.object().keys({
userId: joi.number().required(),
loginCount: joi.number(),
lastLoginDate: joi.date().raw().required(),
}).unknown(true).required()
}).required(),
producer: joi.object().required()
}

module.exports = {
processCreateUser,
processUpdateUser
processUpdateUser,
processUserLogin
}

logger.buildService(module.exports)

0 comments on commit ecb3146

Please sign in to comment.