Skip to main content

Pipelines

Pipelines are similar to commands, but have several important differences:

  1. They are described in the Kanalo configuration file and are executed by Kanalo immediately when a hook is fired, instead of being sent to Kanalo by an external backend service.
  2. Because of this, they are much faster than normal commands and don't require any outside infrastructure.
  3. However, unlike a command, which could have access to stored information, the message data, or other computation on the backend before being returned, they can only use preset strings and the event metadata for their actions.

Format and available commands

Pipelines are written in the configuration file using the following form, as a list under the commands property of a hook:

command: 'message'
params:
message: Your IP is ${IPADDR}

The command property holds the string name of the command to be invoked. The params property is an object containing the arguments for the command, which depends on the command invoked. The available commands, and their parameters, are:

MethodParametersDescription
message- sockets (optional string[]): The IDs of socket(s) to message
- tags (optional string[]): The tag(s) to message
- message (string): The data to send
Sends the data directly to the specified sockets or all sockets associated with the given tags. If neither sockets or tags are included the message is sent to the socket that triggered the event.
disconnect- sockets (optional string[]): The IDs of socket(s) to disconnect
- tags (optional string[]): The tag(s) identifying sockets to disconnect
- code (number): The disconnect code (see here for valid codes)
- reason (string): The disconnect message
Disconnects the specified socket(s) or all sockets associated with the given tags. If neither sockets or tags are included the socket that triggered the event is disconnected.
tag- sockets (string[]): The IDs of socket(s) to add
- tags (optional string[]): The tag(s) identifying sockets to add
- tagsToAdd (string[]): The tag(s) each socket is added to
Associate the specified socket(s) with the specified tag(s).
untag- sockets (string[]): The IDs of socket(s) to remove
- tags (optional string[]): The tag(s) identifying sockets to remove
- tagsToRemove (string[]): The tag(s) each socket is removed from
De-associate the specified socket(s) from the specified tag(s).

As mentioned in point three above, the strings passed to the commands are set when the configuration file is applied. However, they can include metadata that will vary from event to event.

Metadata

Pipeline commands have access to the event metadata for their string arguments, using the same braces syntax (${variable}) as rules strings and url strings in the configuration file. The properties Pipelines have access to are:

Properties:

  • ID (string): The socket's id value.
  • IPADDR (string): The IP address of the socket.
  • TenantName (string): The name of the tenant.
  • ClientName (string): The name of the client application the socket connected with.
  • DATA (string): The contents of the message (stringified if not already a string).

Functions:

  • Tags(string): Takes a single tag string as an argument and will return true if the socket has that tag or false otherwise.
  • TagsMatch(...string): Takes one or more glob pattern strings as arguments and will return the first tag matching one of the patterns.
  • TagsRegex(string): Takes an regex string as an argument and will return the first tag matching it.
  • Claims(string): Returns the value of the claim key passed as an argument. Claim values that are objects will be stringified.
  • ClaimsMatch(...string): Takes one or more glob pattern strings as arguments and will return the value of the first key in the claims matching one of the patterns. Will not return non-string claim values.
  • ClaimsRegex(string): Takes an regex string as an argument and will return the value of the first key in the claims that it matches. Will not return non-string claims.
  • Headers(string): Returns the value of the header passed as an argument. Header values that are objects will be stringified.
  • HeadersMatch(...string): Takes one or more glob pattern strings as arguments and will return the value of the first header matching one of the patterns. Will not return non-string values.
  • HeadersRegex(string): Takes an regex string as an argument and will return the value of the first header that it matches. Will not return non-string values.
  • Query(string): Returns the value of the query string parameter passed as an argument. Values that are objects will be stringified.
  • QueryMatch(...string): Takes one or more glob pattern strings as arguments and will return the value of the first query string parameter matching one of the patterns. Will not return non-string values.
  • QueryRegex(string): Takes an regex string as an argument and will return the value of the first query string parameter that it matches. Will not return non-string values.
  • Json(item, path): Takes a JSON string, parses it, and returns the value of path. Eg. Json('${DATA}', '$.name') will extract the name field from the message data. Path syntax can be found here.
  • MatchTest(string, pattern): Tests if the given string matches the given glob pattern.
  • MatchExtract(string, pattern): Returns the first substring from the given string that matches the given glob pattern.
  • RegexTest(string, regex): Validates if the given string matches the given regex string.
  • RegexExtract(string, regex): Returns the first match when the given regex is executed on the given string.

For functions, prefer to use regular matching over glob matching over regex for simplicity. For basic glob syntax, see wikipedia Regular expressions passed to the function must be RE2 compliant.

Examples and use cases

The following are a few simple examples of how Pipelines can be used.

Echo

${DATA} can be used to echo messages back to the socket. When your tenant is first created, this is how the default echo it is pre-configured with is set up. Under hooks in the config file:

MESSAGE:
- name: Echo
commands:
- command: message
params:
message: ${DATA}

Tagging a user with an identifier

When a socket connects, you could use a custom claim on their authentication token to tag the socket with their user id on your system.

Using your authentication provider, add a claim to every connecting token, say user_id, and have it assigned the user's id.

Then under hooks in the config file you can use the following command:

CONNECT:
- name: TagUserById
commands:
- command: tag
params:
tagsToAdd:
- ${Claims('user_id')}

Saying hello

Similar to the tagging example, if the token contains the user's real name, we could use this to send a message to them when they connect. If the claim is name:

CONNECT:
- name: Welcome
commands:
- command: message
params:
message: Welcome ${Claims('name')}!

Chat system

This gist implements a primitive multi-user chat system using only pipelines. Users declare their usernames as a query parameter.