RPCMapper Class
The RPCMapper
class maps RabbitEnvelop messages
to a specific Data Entity Service layer
and a specific Service layer action
It can be used inside a channel consumer function
from a Worker
, or inside REST HTTP controller
from an API
to execute a Data Entity Service
action
.
Those actions may be database
related: Create
, Update
, Delete
, List one
, List all
records.
Not least, those actions may be fyle system
related: Upload
, Copy
, Move
, Read
, Edit
, Delete
files.
As we were discussion prior, sending
and receiving
messages
, is the way which Micro Services
talk
to each other.
The problem is that we have 2 underlying message protocols being handled here: HTTP (APIs)
and AMQP (workers)
messages
Then to solve above problem, we use the RabbitEnvelop class, which implements a Message Contract
that will be used as standard format
to send
and receive
Micro Service
messages.
Back to Documentation main page
Using inside a REST HTTP controller
When a browser
makes a HTTP request (ajax)
to a REST end point
, it is sending a HTTP message
to a specific REST HTTP controller
.
This is how HTTP protocol works. Based on messages.
The REST HTTP controller
then receives the HTTP message
over a specific HTTP Method
(GET, POST, PUT, DEL, PATCH) implementation.
The received HTTP message
is a combination of Message Headers
and Message Body
information.
Then based on that information, the REST HTTP controller
execute a specific action on server.
Lets conside the following
src/lib/api/controllers/DemoUser.js
controller file example:
DemoUser
controller
/**
import REST response function
*/
import { result, notFound, errorResponse, accepted, created } from '../../response';
/**
Import RabbitEnvelop message composer class
*/
import RabbitEnvelop from '../../RabbitEnvelop'
/**
Import application running instance to be able to use it RPC Mapper
*/
import app from '../../../app'
const
entity = "DemoUser",
queue = `JumentiX.${entity}`,
primaryKeyName = '_id';
/**
HTTP Method: GET
HTTP route: /api/DemoUser
SERVER PROCEDURE: List all Entities by consuming it service layer
*/
export async function list(req, res)
{
try {
const
// set Service Procedure name
action = "getAll",
// set who asked for the job
from = req.user,
// create a job request message
job = new RabbitEnvelop( { from, entity, action }),
/**
RPC - application.mapperRPC.services.DemoUser.getAll( job )
execute the job
*/
{ error, data, count, pages, limit, page } = await app.mapperRPC.services[ entity ][ action ]( job ),
// set job message
message = ( error.message || error, ( action + " " + entity ) );
// check if is there a error executing the job
if( error ) return errorResponse(res, { error, message })
// respond job result
return result(res, { data, count, pages, limit, page, message })
} catch (error) {
return errorResponse(res, { error, message: error.message })
}
}
/**
HTTP Method: GET
HTTP route: /api/DemoUser/{id}
SERVER PROCEDURE: Read a specific Entity by consuming it service layer
*/
export async function read(req, res) {
try {
const
// set Service Procedure name
action = "getById",
// set who asked for the job
from = req.user,
// set payload to execute the job
payload = {
[ primaryKeyName ]: req.swagger.params.id.value,
},
// create a job request message
job = new RabbitEnvelop( { from, entity, action, payload }),
/**
RPC - application.mapperRPC.services.DemoUser.getById( job )
execute the job
*/
{ error, data } = await app.mapperRPC.services[ entity ][ action ]( job ),
// set job message
message = ( error.message || error, ( action + " " + entity ) );
// check if is there a error executing the job
if( error ) return errorResponse(res, { error, message })
// respond job result
return result(res, { data, message })
} catch (error) {
return errorResponse(res, { error, message: error.message })
}
}
Using inside a channel consumer function
function
When you setup a RabbitMQ Subscriber worker, it starts listening to a specific queues.
A worker must have a default channel consumer function
, in other words, a callback function which is called every time when the worker receives a message from that queue.
Lets conside the following
src/worker-MsWorker.js
worker application file example:
worker-MsWorker
worker
'use strict';
import chalk from 'chalk'
import Application from './lib/Application.js'
import RabbitClient from './lib/RabbitClient.js'
import RedisClient from './lib/RedisClient.js'
import MongoClient from './lib/MongoClient.js'
import SequelizeClient from './lib/SequelizeClient'
import RPCMapper from './lib/RPCMapper'
import { validateJob } from './lib/util'
class MsWorker extends SequelizeClient( MongoClient( RabbitClient( RedisClient( Application ) ) ) )
{
constructor( c )
{
// worker constructor
}
start()
{
// start application services .....
}
//
/**
channel consumer function
pertinent to to this application bussins logic
this function is called all the times when a message arrives
*/
consumeMessage( msg )
{
let self = this;
( async () => {
let job = null,
isMessageValid = false;
try {
job = JSON.parse(msg.content.toString());
isMessageValid = true;
} catch (e) {
}
if ( ( ! isMessageValid ) || ( ! validateJob( job ) ))
{
self.channel.ack(msg)
if( ! isMessageValid )
{
self.console.error( 'job is not a valid JSON. The received message was removed from the queue.', job )
}
else if( ! validateJob( job ) ){
isMessageValid = false;
self.console.error( 'job is not valid. The received message was removed from the queue.', job )
}
return
}
/**
msg is a RabbitMQ AMQP message
job is a RabbitEnvelop Message
execute the job
*/
self.mapperRPC.execute( job, msg )
} )()
}
}
export default MsWorker