RPCMapper Class
The RPCMapper class maps RabbitEnvelop messages to a specific Data Entity Service layer and a specific Service layer action
It can be used inside a channel consumer function from a Worker, or inside REST HTTP controller from an API to execute a Data Entity Service action.
Those actions may be database related: Create, Update, Delete, List one, List all records.
Not least, those actions may be fyle system related: Upload, Copy, Move, Read, Edit, Delete files.
As we were discussion prior, sending and receiving messages, is the way which Micro Services talk to each other.
The problem is that we have 2 underlying message protocols being handled here: HTTP (APIs) and AMQP (workers) messages
Then to solve above problem, we use the RabbitEnvelop class, which implements a Message Contract that will be used as standard format to send and receive Micro Service messages.
Back to Documentation main page
Using inside a REST HTTP controller
When a browser makes a HTTP request (ajax) to a REST end point, it is sending a HTTP message to a specific REST HTTP controller.
This is how HTTP protocol works. Based on messages.
The REST HTTP controller then receives the HTTP message over a specific HTTP Method (GET, POST, PUT, DEL, PATCH) implementation.
The received HTTP message is a combination of Message Headers and Message Body information.
Then based on that information, the REST HTTP controller execute a specific action on server.
Lets conside the following
src/lib/api/controllers/DemoUser.jscontroller file example:
DemoUser controller
/**
import REST response function
*/
import { result, notFound, errorResponse, accepted, created } from '../../response';
/**
Import RabbitEnvelop message composer class
*/
import RabbitEnvelop from '../../RabbitEnvelop'
/**
Import application running instance to be able to use it RPC Mapper
*/
import app from '../../../app'
const
entity = "DemoUser",
queue = `JumentiX.${entity}`,
primaryKeyName = '_id';
/**
HTTP Method: GET
HTTP route: /api/DemoUser
SERVER PROCEDURE: List all Entities by consuming it service layer
*/
export async function list(req, res)
{
try {
const
// set Service Procedure name
action = "getAll",
// set who asked for the job
from = req.user,
// create a job request message
job = new RabbitEnvelop( { from, entity, action }),
/**
RPC - application.mapperRPC.services.DemoUser.getAll( job )
execute the job
*/
{ error, data, count, pages, limit, page } = await app.mapperRPC.services[ entity ][ action ]( job ),
// set job message
message = ( error.message || error, ( action + " " + entity ) );
// check if is there a error executing the job
if( error ) return errorResponse(res, { error, message })
// respond job result
return result(res, { data, count, pages, limit, page, message })
} catch (error) {
return errorResponse(res, { error, message: error.message })
}
}
/**
HTTP Method: GET
HTTP route: /api/DemoUser/{id}
SERVER PROCEDURE: Read a specific Entity by consuming it service layer
*/
export async function read(req, res) {
try {
const
// set Service Procedure name
action = "getById",
// set who asked for the job
from = req.user,
// set payload to execute the job
payload = {
[ primaryKeyName ]: req.swagger.params.id.value,
},
// create a job request message
job = new RabbitEnvelop( { from, entity, action, payload }),
/**
RPC - application.mapperRPC.services.DemoUser.getById( job )
execute the job
*/
{ error, data } = await app.mapperRPC.services[ entity ][ action ]( job ),
// set job message
message = ( error.message || error, ( action + " " + entity ) );
// check if is there a error executing the job
if( error ) return errorResponse(res, { error, message })
// respond job result
return result(res, { data, message })
} catch (error) {
return errorResponse(res, { error, message: error.message })
}
}
Using inside a channel consumer function function
When you setup a RabbitMQ Subscriber worker, it starts listening to a specific queues.
A worker must have a default channel consumer function, in other words, a callback function which is called every time when the worker receives a message from that queue.
Lets conside the following
src/worker-MsWorker.jsworker application file example:
worker-MsWorker worker
'use strict';
import chalk from 'chalk'
import Application from './lib/Application.js'
import RabbitClient from './lib/RabbitClient.js'
import RedisClient from './lib/RedisClient.js'
import MongoClient from './lib/MongoClient.js'
import SequelizeClient from './lib/SequelizeClient'
import RPCMapper from './lib/RPCMapper'
import { validateJob } from './lib/util'
class MsWorker extends SequelizeClient( MongoClient( RabbitClient( RedisClient( Application ) ) ) )
{
constructor( c )
{
// worker constructor
}
start()
{
// start application services .....
}
//
/**
channel consumer function
pertinent to to this application bussins logic
this function is called all the times when a message arrives
*/
consumeMessage( msg )
{
let self = this;
( async () => {
let job = null,
isMessageValid = false;
try {
job = JSON.parse(msg.content.toString());
isMessageValid = true;
} catch (e) {
}
if ( ( ! isMessageValid ) || ( ! validateJob( job ) ))
{
self.channel.ack(msg)
if( ! isMessageValid )
{
self.console.error( 'job is not a valid JSON. The received message was removed from the queue.', job )
}
else if( ! validateJob( job ) ){
isMessageValid = false;
self.console.error( 'job is not valid. The received message was removed from the queue.', job )
}
return
}
/**
msg is a RabbitMQ AMQP message
job is a RabbitEnvelop Message
execute the job
*/
self.mapperRPC.execute( job, msg )
} )()
}
}
export default MsWorker