src/client/helper.js
import { CometD, Transports } from 'zetapush-cometd'
import { ConnectionStatusListener } from '../connection/connection-status'
import { Macro } from '../mapping/services'
import { getServers, isDerivedOf, shuffle, uuid } from '../utils/index'
/**
* CometD Messages enumeration
* @type {Object}
*/
const Message = {
RECONNECT_HANDSHAKE_VALUE: 'handshake',
RECONNECT_NONE_VALUE: 'none',
RECONNECT_RETRY_VALUE: 'retry'
}
/**
* Delay to update server url
* @type {integer}
*/
const UPDATE_SERVER_URL_DELAY = 250
/**
* Default macro channel
* @type {string}
*/
const DEFAULT_MACRO_CHANNEL = 'completed'
/**
* Provide utilities and abstraction on CometD Transport layer
* @access private
*/
export class ClientHelper {
/**
* Create a new ZetaPush client helper
*/
constructor({ apiUrl, sandboxId, forceHttps = false, authentication, resource = null, transports = Transports }) {
/**
* @access private
* @type {string}
*/
this.sandboxId = sandboxId
/**
* @access private
* @type {function():AbstractHandshake}
*/
this.authentication = authentication
/**
* @access private
* @type {string}
*/
this.resource = resource
/**
* @access private
* @type {number}
*/
this.requestId = 0
/**
* @access private
* @type {string}
*/
this.userId = null
/**
* @access private
* @type {Object}
*/
this.userInfo = null
/**
* @access private
* @type {string}
*/
this.uniqId = uuid()
/**
* @access private
* @type {Promise}
*/
this.servers = getServers({ apiUrl, sandboxId, forceHttps, transports }).catch((error) => {
// Notify error in connection to server step
this.connectionToServerFail(error)
// Return empty list
return []
})
/**
* @access private
* @type {Array<Object>}
*/
this.connectionListeners = []
/**
* @access private
* @type {boolean}
*/
this.connected = false
/**
* @access private
* @type {boolean}
*/
this.wasConnected = false
/**
* @access private
* @type {string}
*/
this.serverUrl = null
/**
* @access private
* @type {string}
*/
this.sessionId = null
/**
* @access private
* @type {Array<Object>}
*/
this.subscribeQueue = []
/**
* @access private
* @type {CometD}
*/
this.cometd = new CometD()
// Register transports layers
transports.ALL.forEach(({ type, Transport }) => {
this.cometd.registerTransport(type, new Transport())
})
// Handle transport exception
this.cometd.onTransportException = (cometd, transport) => {
// Try to find an other available server
// Remove the current one from the _serverList array
this.updateServerUrl()
}
this.cometd.addListener('/meta/handshake', ({ ext, successful, advice, error }) => {
this.cometd._debug('ClientHelper::/meta/handshake', { ext, successful, advice, error })
if (successful) {
const { authentication = null } = ext
this.initialized(authentication)
} else {
this.handshakeFailure(error)
}
})
this.cometd.addListener('/meta/handshake', ({ advice, error, ext, successful }) => {
this.cometd._debug('ClientHelper::/meta/handshake', { ext, successful, advice, error })
// AuthNegotiation
if (!successful) {
if (typeof advice === 'undefined') {
return
}
if (Message.RECONNECT_NONE_VALUE === advice.reconnect) {
this.authenticationFailed(error)
} else if (Message.RECONNECT_HANDSHAKE_VALUE === advice.reconnect) {
this.negotiationFailed(error)
}
}
})
this.cometd.addListener('/meta/connect', ({ advice, channel, successful }) => {
this.cometd._debug('ClientHelper::/meta/connect', { advice, channel, successful })
// ConnectionListener
if (this.cometd.isDisconnected()) {
this.connected = false
// Notify connection will close
this.connectionWillClose()
} else {
this.wasConnected = this.connected
this.connected = successful
if (!this.wasConnected && this.connected) {
this.cometd.batch(this, () => {
// Unqueue subscriptions
this.subscribeQueue.forEach(({ prefix, listener, subscriptions }) => {
this.subscribe(prefix, listener, subscriptions)
})
})
// Notify connection is established
this.connectionEstablished()
} else if (this.wasConnected && !this.connected) {
// Notify connection is broken
this.connectionBroken()
}
}
})
this.cometd.addListener('/meta/disconnect', ({ channel, successful }) => {
this.cometd._debug('ClientHelper::/meta/disconnect', { channel, successful })
if (this.cometd.isDisconnected()) {
this.connected = false
// Notify connection is closed
this.connectionClosed()
}
})
}
/**
* Add a connection listener to handle life cycle connection events
* @param {ConnectionStatusListener} listener
* @return {number} handler
*/
addConnectionStatusListener(listener) {
this.connectionListeners.push({
enabled: true,
listener: Object.assign(new ConnectionStatusListener(), listener)
})
return this.connectionListeners.length - 1
}
/**
* Notify listeners when handshake step succeed
*/
authenticationFailed(error) {
this.userId = null
this.userInfo = null
this.connectionListeners
.filter(({ enabled }) => enabled)
.forEach(({ listener }) => {
listener.onFailedHandshake(error)
})
}
/**
* Connect client using CometD Transport
*/
connect() {
this.servers.then((servers) => {
if (servers.length > 0) {
// Get a random server url
this.serverUrl = shuffle(servers)
// Configure CometD
this.cometd.configure({
url: `${this.serverUrl}/strd`,
backoffIncrement: 1000,
maxBackoff: 60000,
appendMessageTypeToURL: false
})
// Send handshake fields
this.cometd.handshake(this.getHandshakeFields())
} else {
// No servers available
this.noServerUrlAvailable()
}
})
}
/**
* Notify listeners when connection is broken
*/
connectionBroken() {
this.connectionListeners
.filter(({ enabled }) => enabled)
.forEach(({ listener }) => {
listener.onConnectionBroken()
})
}
/**
* Notify listeners when connection is closed
*/
connectionClosed() {
this.userId = null
this.userInfo = null
this.connectionListeners
.filter(({ enabled }) => enabled)
.forEach(({ listener }) => {
listener.onConnectionClosed()
})
}
/**
* Notify listeners when connection is established
*/
connectionEstablished() {
this.connectionListeners
.filter(({ enabled }) => enabled)
.forEach(({ listener }) => {
listener.onConnectionEstablished()
})
}
/**
* Notify listeners when connection to server fail
*/
connectionToServerFail(failure) {
this.connectionListeners
.filter(({ enabled }) => enabled)
.forEach(({ listener }) => {
listener.onConnectionToServerFail(failure)
})
}
/**
* Notify listeners when connection will close
*/
connectionWillClose() {
this.connectionListeners
.filter(({ enabled }) => enabled)
.forEach(({ listener }) => {
listener.onConnectionWillClose()
})
}
/**
* Create a promise based macro service
* @experimental
* @param {{listener: Object, Type: class, deploymentId: string}} parameters
* @return {Object} service
*/
createAsyncMacroService({ listener, Type, deploymentId = Type.DEFAULT_DEPLOYMENT_ID }) {
const prefix = `/service/${this.getSandboxId()}/${deploymentId}`
const $publish = this.getAsyncMacroPublisher(prefix)
// Create service by publisher
return this.createServiceByPublisher({ listener, prefix, Type, $publish })
}
/**
* Create a publish/subscribe service
* @param {{listener: Object, Type: class, deploymentId: string}} parameters
* @return {Object} service
*/
createService({ listener, Type, deploymentId = Type.DEFAULT_DEPLOYMENT_ID }) {
const isMacroType = isDerivedOf(Type, Macro)
const prefix = `/service/${this.getSandboxId()}/${deploymentId}`
const $publish = isMacroType ? this.getMacroPublisher(prefix) : this.getServicePublisher(prefix)
// Create service by publisher
return this.createServiceByPublisher({ listener, prefix, Type, $publish })
}
/**
* @param {{listener: Object, prefix: string, Type: class, $publish: Function}} parameters
* @return {Object} service
*/
createServiceByPublisher({ listener, prefix, Type, $publish }) {
const service = new Type({ $publish })
// Store subscription in service instance
service.$subscriptions = this.subscribe(prefix, listener)
return service
}
/**
* Disconnect CometD client
*/
disconnect() {
this.cometd.disconnect(true)
}
/**
* Get a publisher for a macro service that return a promise
* @experimental
* @param {string} prefix - Channel prefix
* @return {Function} publisher
*/
getAsyncMacroPublisher(prefix) {
return (name, parameters, hardFail = false, debug = 1) => {
const channel = `${prefix}/call`
const uniqRequestId = this.getUniqRequestId()
const subscriptions = {}
return new Promise((resolve, reject) => {
const handler = ({ data = {} }) => {
const { result = {}, errors = [], requestId } = data
if (requestId === uniqRequestId) {
// Handle errors
if (errors.length > 0) {
reject(errors)
} else {
resolve(result)
}
this.unsubscribe(subscriptions)
}
}
// Create dynamic listener method
const listener = {
[name]: handler,
[DEFAULT_MACRO_CHANNEL]: handler
}
// Ad-Hoc subscription
this.subscribe(prefix, listener, subscriptions)
// Publish message on channel
this.publish(channel, {
debug,
hardFail,
name,
parameters,
requestId: uniqRequestId
})
})
}
}
/**
* Get client id
* @return {string} clientId
*/
getClientId() {
return this.cometd.getClientId()
}
/**
* Get CometD handshake parameters
* @return {Object}
*/
getHandshakeFields() {
const handshake = this.authentication()
return handshake.getHandshakeFields(this)
}
/**
* Get a publisher for a macro service
* @param {string} prefix - Channel prefix
* @return {Function} publisher
*/
getMacroPublisher(prefix) {
return (name, parameters, hardFail = false, debug = 1) => {
const channel = `${prefix}/call`
const requestId = this.getUniqRequestId()
return this.publish(channel, {
debug,
hardFail,
name,
parameters,
requestId
})
}
}
/**
* Get queued subscription index
* @return {Object} index
*/
getQueuedSubscription(subscriptions = {}) {
const index = this.subscribeQueue.findIndex((element) => subscriptions === element.subscriptions)
return {
index,
queued: index > -1
}
}
/**
* Get resource
* @return {string}
*/
getResource() {
return this.resource
}
/**
* Get sandbox id
* @return {string}
*/
getSandboxId() {
return this.sandboxId
}
/**
* Get server urls list
* @return {Promise} servers
*/
getServers() {
return this.servers
}
/**
* Get a publisher for a service
* @param {string} prefix - Channel prefix
* @return {Function} publisher
*/
getServicePublisher(prefix) {
return (method, parameters) => {
const channel = `${prefix}/${method}`
return this.publish(channel, parameters)
}
}
/**
* Get uniq request id
* @return {string}
*/
getUniqRequestId() {
return `${this.getClientId()}:${this.uniqId}:${++this.requestId}`
}
/**
* Get user id
* @return {string}
*/
getUserId() {
return this.userId
}
/**
* Get user info
* @return {Objet}
*/
getUserInfo() {
return this.userInfo
}
/**
* Manage handshake failure case
*/
handshakeFailure() {
this.userId = null
this.userInfo = null
}
/**
* Notify listeners when connection is established
*/
initialized(authentication) {
if (authentication) {
this.userId = authentication.userId
this.userInfo = authentication.userInfo
}
this.connectionListeners
.filter(({ enabled }) => enabled)
.forEach(({ listener }) => {
listener.onSuccessfulHandshake(authentication)
})
}
/**
* Is client connected to ZetaPush
* @return {boolean}
*/
isConnected() {
return !this.cometd.isDisconnected()
}
/**
* Notify listeners when a message is lost
*/
messageLost(channel, data) {
this.connectionListeners
.filter(({ enabled }) => enabled)
.forEach(({ listener }) => {
listener.onMessageLost(channel, data)
})
}
/**
* Negociate authentication
* @param {error} error
*/
negotiationFailed(error) {
this.connectionListeners
.filter(({ enabled }) => enabled)
.forEach(({ listener }) => {
listener.onNegotiationFailed(error)
})
}
/**
* Notify listeners when no server url available
*/
noServerUrlAvailable() {
this.connectionListeners
.filter(({ enabled }) => enabled)
.forEach(({ listener }) => {
listener.onNoServerUrlAvailable()
})
}
/**
* Wrap CometdD publish method
* @param {String} channel
* @param {Object} parameters
* @return {Object}
*/
publish(channel, parameters = {}) {
this.cometd.publish(channel, parameters)
return { channel, parameters }
}
/**
* Remove a connection status listener
*/
removeConnectionStatusListener(handler) {
const listener = this.connectionListeners[handler]
if (listener) {
listener.enabled = false
}
}
/**
* Set a new authentication methods
* @param {function():AbstractHandshake} authentication
*/
setAuthentication(authentication) {
this.authentication = authentication
}
/**
* Set logging level for CometD client
* Valid values are the strings 'error', 'warn', 'info' and 'debug', from
* less verbose to more verbose.
* @param {string} level
*/
setLogLevel(level) {
this.cometd.setLogLevel(level)
}
/**
* Subsribe all methods defined in the listener for the given prefixed channel
* @param {string} prefix - Channel prefix
* @param {Object} listener
* @param {Object} subscriptions
* @return {Object} subscriptions
*/
subscribe(prefix, listener = {}, subscriptions = {}) {
const { queued } = this.getQueuedSubscription(subscriptions)
if (!queued) {
// Store arguments to renew subscriptions on connection
this.subscribeQueue.push({ prefix, listener, subscriptions })
}
// Subscribe if user is connected
if (!this.cometd.isDisconnected()) {
for (let method in listener) {
if (listener.hasOwnProperty(method)) {
const channel = `${prefix}/${method}`
subscriptions[method] = this.cometd.subscribe(channel, listener[method])
}
}
}
return subscriptions
}
/**
* Remove current server url from the server list and shuffle for another one
*/
updateServerUrl() {
this.servers.then((servers) => {
const index = servers.indexOf(this.serverUrl)
if (index > -1) {
servers.splice(index, 1)
}
if (servers.length === 0) {
// No more server available
this.noServerUrlAvailable()
} else {
this.serverUrl = shuffle(servers)
this.cometd.configure({
url: `${this.serverUrl}/strd`
})
setTimeout(() => {
this.cometd.handshake(this.getHandshakeFields())
}, UPDATE_SERVER_URL_DELAY)
}
})
}
/**
* Remove all subscriptions
* @param {Object} subscriptions
*/
unsubscribe(subscriptions = {}) {
// Unsubscribe
for (let method in subscriptions) {
if (subscriptions.hasOwnProperty(method)) {
const subscription = subscriptions[method]
this.cometd.unsubscribe(subscription)
}
}
// Remove subscription from queue
const { index, queued } = this.getQueuedSubscription(subscriptions)
if (queued) {
this.subscribeQueue.splice(index, 1)
}
}
}