Skip to content

Guide to Data In Motion in RDA Fabric

RDA Pipelines can exchange data even if they are running at different locations, as long as they all connected to same instance of RDA Fabric.

RDA Fabric uses RDA Streams Using NATS for control path as well as as the default mechanism for data path. Any other streaming approach discussed in section 3.2 can also be used for data path. Multiple streaming approaches may be used at the same time.


1. RDA Streams

  • Uses NATS Publish-Subscribe to move the data
  • Each Stream name is translated into a NATS Subject.
  • Any Stream which is added to list of Persistent Streams it is automatically persisted in Opensearch.
  • Each subscriber can use a "group" parameter to identify consumer group. If multiple subscribers use same group name, only one of them will receive that specific batch of data.
  • Data can be routed to multiple subscribers by using a different group name for each of the subscribers.
  • Stream name NULL (case-insensitive), implies no data will be written or read from that stream.
  • RDA Streams do not have to be created in RDAF. First time a data is published or subscribed, streams come into existence.
  • List of RDA Streams used by RDA Fabric for it's own operations:
Stream Name Description
rda_system_feature_usage RDA Portal and Studio capture all usage metrics by feature.
This report is accessible via left side menu Analytics
rda_system_deployment_updates Audit trail of changes to Service Blueprints
rda_system_gw_endpoint_metrics RDA Event Gateway data ingestion metrics by ingestion endpoint
rda_system_worker_trace_summary Pipeline execution statistics by Site, Bots etc.
rda_worker_resource_usage Resource usage metrics published by RDA Workers
rda_system_gw_log_archival_updates Log Archive metrics by RDA Event Gateway
rda_system_log_replays Audit trail of Log Archive replays
rda_system_worker_traces Detailed execution traces for each pipeline, published by RDA Workers

All the above RDA Streams are also Persistent Streams

  • Example RDA Service Map showing data flow between pipelines using RDA Streams:

Servicemap

Related Bots

Related RDA Client CLI Commands

    read-stream               Read messages from an RDA stream
    read-stream-ack           Read messages from an RDA Stream with acknowledgements enabled
    write-stream              Write data to the specified stream
    write-stream-ack          Write data to the specified stream with acknowledgement enabled

2. Bots to Manage Data Inflight

This section provides details about various bots to cleanup, re-shape, aggregate, enrich, mask, extract data as it progresses through the pipeline. RDA bots also can be used verify integrity of data to ensure it meets certain criteria.

2.1. Data Cleanup

Data cleanup means getting rid of unnecessary rows & columns, fixing the values, reformatting values on as needed basis.

Following are some of the bots that can be used for cleaning up of the data:

Bot Description
@dm:dedup Deduplicates data by using specified column values
@dm:dropnull Drop rows if specified column have null values
@dm:map Map values from one column to another, using optional mapping function and arguments
@dm:map-multi-proc Same as @dm:map, but uses all available CPU core to do parallel processing
@dm:eval Map values using evaluate function. Specify one of more column = 'expression' pairs
@dm:eval-multi-proc Same as @dm:eval, but uses all available CPU core to do parallel processing
*dm:filter Applies a CFXQL filter on input dataframe and returns rows that matches the filter
*dm:time-filter Filter rows using one of the timestamp based column for a specific time range
@#dm:filter-using-dict Applies a CFXQL filters specified in separate dictionary (saved dataset) and returns rows that matches the filters
@dm:add-missing-columns Add columns to dataframe if they are not found.
@dm:drop-null-columns Drop specified columns if they have certain percentage of null values
@dm:mergecolumns Merge columns using 'include' regex and/or 'exclude' regex into a 'to' column
@dm:fixcolumns Fix column names such that they contain only allowed characters
@dm:fixnull Replace null values in a comma separated column list
@dm:fixnull-regex Replace null values in all columns that match the specified regular expression
@dm:rename-columns Rename specified column names using new_column_name = 'old_column_name' format
@dm:replace-data Replace data using regex pattern from list of comma separated columns
@dm:selectcolumns Select columns using 'include' regex and/or 'exclude' regex
@dm:to-type Change data type to str or int or float for specified columns
@dm:change-time-format Change timestamp from one format to another for all specified columns

2.2 Data Re-shaping

RDA cfxdm extension provides many bots to re-shape inflight data.

Row Based Re-shaping

Following are some of the bots that can do row based re-shaping of inflight data:

Bot Description
@dm:empty Creates an empty dataframe.
If used as Sink bot, it empties the previous dataframe
@dm:addrow Adds a new row with specified columns and fixed values
@dm:head Keep first n rows of the dataframe and discard the rest
@dm:tail Keep last n rows of the dataframe and discard the rest
@dm:sample Randomly select rows from input.
Can be used to select subset of rows or replicate rows randomly to create more rows
@dm:sort Sort rows in ascending or descending order using specified column values
@dm:concat Loads one or more saved datasets, concatenates them and returns a single dataframe
@dm:dedup Deduplicates data by using specified column values
@dm:dropnull Drop rows if specified column have null values
@dm:explode From a specified column value, split it using a seperator,
replicate the splitted value into multiple rows.
It keeps rest of the columns in-tact
@dm:implode Merge values from different rows into single value for that column
@dm:explode-json From a specified column value, split it as if it a JSON dict or list
replicate the splitted value into multiple rows.
It keeps rest of the columns in-tact
*dm:filter Applies a CFXQL filter on input dataframe and returns rows that matches the filter
*dm:time-filter Filter rows using one of the timestamp based column for a specific time range
@#dm:filter-using-dict Applies a CFXQL filters specified in separate dictionary (saved dataset) and returns rows that matches the filters
Column Based Re-shaping

Following are some of the bots that can do column based re-shaping of inflight data:

Bot Description
@dm:map Map values from one column to another, using optional mapping function and arguments
@dm:map-multi-proc Same as @dm:map, but uses all available CPU core to do parallel processing
@dm:eval Map values using evaluate function. Specify one of more column = 'expression' pairs
@dm:eval-multi-proc Same as @dm:eval, but uses all available CPU core to do parallel processing
@dm:add-missing-columns Add columns to dataframe if they are not found.
@dm:drop-null-columns Drop specified columns if they have certain percentage of null values
@dm:fixcolumns Fix column names such that they contain only allowed characters
@dm:mergecolumns Merge columns using 'include' regex and/or 'exclude' regex into a 'to' column
@dm:rename-columns Rename specified column names using new_column_name = 'old_column_name' format
@dm:selectcolumns Select columns using 'include' regex and/or 'exclude' regex
@dm:transpose Transposes columns to rows


2.3 Data Extraction

Following bots can be used to extract data from unstructured logs & events

Bot Description
@dm:eval Simple parsing & spitting by a word can be accomplished using this bot
@dm:eval-multi-proc Simple parsing & spitting by a word can be accomplished using this bot
@dm:extract Extract data using Python Named Capturing Groups in Regular Expressions
@dm:grok Extract using Grok Patterns. RDA Supported Grok Patterns
@dm:grok-multi-proc Extract using Grok Patterns. RDA Supported Grok Patterns. Uses multiple processes to accomplish parallel processing of data

Example Pipeline snippets for Grok Parsing

1
2
3
4
5
6
    @dm:empty
        --> @dm:addrow
            raw = "<85>Mar 21 03:22:17 cfx-rda-worker-vm01 sshd[414991]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.101.101  user=macaw"
        --> @dm:grok 
            column = 'raw' & 
            pattern = "%{SYSLOGLINE}"

Output for the above example

Column Value
logsource cfx-rda-worker-vm01
priority
facility
timestamp Mar 21 03:22:17
program sshd
message pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.101.101 user=macaw
pid 414991
timestamp8601
meta_grok_message


2.4 Data Enrichment

Following bots can be used to enrich inflight data with other datasets:

Bot Description
@dm:enrich Enrich the input dataframe using a saved dictionary dataset
@dm:enrich-using-ip-cidr Enrich the input dataframe using a saved dictionary dataset.
Match IP address in input dataframe with CIDRs specified in the dictionary
@dm:enrich-using-ip-cidr-multi-proc Same as @dm:enrich-using-ip-cidr, but uses all available processors
@dm:enrich-using-rule-dict Enrich using rule based dictionary which contains 'rule' column
@dm:dns-ip-to-name Perform reverse DNS lookup to map IP Addresses to Hostnames on specified columns
@dm:dns-name-to-ip Perform DNS lookup to map Hostnames to IP Addresses on specified columns

Enrichment using Geo Location Mapping

If the data contains IP Addresses (IPv4 or IPV6), following bot can be used to enrich with geographical location information:

Bot Description
@geomap:lookup Enrich the input dataframe using Geo IP mapping
1
2
3
4
5
6
7
@dm:empty
    --> @dm:addrow
            ip= "104.28.124.67"
    --> @dm:addrow
            ip = "178.197.234.26"
    --> @geomap:lookup
            ipaddress_column = "ip"

Output for the above example

Column Value
geo_accuracy_radius 1000.0
geo_autonomous_system_number 13335.0
geo_autonomous_system_organization CLOUDFLARENET
geo_city_name Fremont
geo_continent_code
geo_continent_name North America
geo_country_iso_code US
geo_country_name United States
geo_is_in_european_union 0
geo_is_satellite_provider 0
geo_latitude 37.562
geo_locale_code en
geo_longitude -122.0
geo_metro_code 807.0
geo_network 104.28.0.0/15
geo_postal_code 94536
geo_status found
geo_subdivision_1_iso_code CA
geo_subdivision_1_name California
geo_subdivision_2_iso_code
geo_subdivision_2_name
geo_time_zone America/Los_Angeles
ip 104.28.124.67

Enrichment using NLP Techniques

If the data contains natural language text, following NLP techniques can be used to enrich the data:

  • Keyword Extraction
  • Named Entity Recognition
  • Summarization
  • Sentiment Analysis

Refer to Natural Language Processing (NLP) in RDA for details on how to accomplish this in RDA.



2.5 Data Integrity

RDA Fabric provides several ways to ensure integrity of the data moving through the fabric.

Following is a brief summary of different ways RDA Pipelines can check for data integrity and possibly take action:

Integrity Check Type Description
Using Checksum Using bots @dm:add-checksum and @dm:verify-checksum, it can be ensured that data has not been tampered between an origin and destination.
Basic Column Checks Using bot @dm:check-columns pipeline can look for existence of specific columns and take action on the data. If the intent is to cleanup the data based on amount of nulls in the data, bots @dm:drop-null-columns, @dm:dropnull, @dm:fixnull, @dm:fixnull-regex can be used.
Validing Against Schema RDA can use JSON Schema to validate any data moving through the fabric. Bot @dm:validate-data validates any incoming data against the specified schema.

Optionally, RDA Datasets can be bound to a specific JSON Schema, and RDA validates the model everytime dataset is saved or updated.
Validating Using Rules Dictionary Bot @dm:check-integrity uses rules dictionary to validate the data and take an action. Each rule would typically specify expected data types for certain set of columns, nulls allowed or not, range of values allowed for numericals, regex patterns for string data types.

Bot @dm:filter-using-dict can be used to drop specific rows based on criteria.

Bot @dm:enrich-using-rule-dict can be used to check for very complex data types, relationships and tag each row for further action.


2.6 Data Masking

RDA provides couple of ways to mask the data. First one using @dm:mask.

Example pipeline

@dm:empty
    --> @dm:addrow
            phone_number = "925-555-5565" &
            ssn = "123-56-8008" &
            email = "john.doe@acme.com"
    ## mask phone number except last 3 digits
    --> @dm:mask
            columns = "phone_number" & pos = -3 & char = "*"
    ## mask SSN except last 4 digits
    --> @dm:mask
            columns = "ssn" & pos = -4 & char = "*"
    ## mask email except first 3 characters
    --> @dm:mask
            columns = "email" & pos = 3 & char = "*"

Output for the pipeline would be:

Column Value
email joh**************
phone_number *********565
ssn *******8008

Certain types of masking can also be done using @dm:eval. Eval allows more complex types of masking. For example, let us say we want to keep first 3 digits of phone number and last 3 digits intact, we can do this way:

1
2
3
4
5
6
7
8
9
@dm:empty
    --> @dm:addrow
            phone_number = "925-555-5565" &
            ssn = "123-56-8008" &
            email = "john.doe@acme.com"
    --> @dm:eval
            ssn = "'*****'+ssn.split('-')[-1]" &
            phone_number = "phone_number[:3]+'*-***-*'+phone_number[-3:]" &
            email = "'***'+email.split('@')[0][-5:]+'@xxxxx.yyy'"

Output for this pipeline would be

Column Value
email ***n.doe@xxxxx.yyy
phone_number 925*-***-*565
ssn *****8008


2.7 Data Aggregations

RDA cfxdm exention provides many bots that can be used for data aggregations.

Computing Time Series Data Histograms

Historgrams convert timeseries data such as logs & metrics into 'events per time-interval'. Let us use sample-servicenow-incidents dataset and use Service Now incidents as our input dataset. Let us how count how many tickets been opened by month since Jan 1, 2020. Let

Code Snippet: Histogram

Following snippet does:

  • Download data from a URL
  • Filter the data to retain incidents opened since Jan 1, 2020. We will just retain sys_created_on column for this purpose
  • Use @dm:hist bot to compite histogram
  • Change the timestamp format for sys_created_on to show year & month only.
@files:loadfile
            filename = 'https://bot-docs.cloudfabrix.io/data/datasets/sample-servicenow-incidents.csv'
    --> *dm:filter
            sys_created_on after '2020-01-01 00:00:00'
            GET sys_created_on
    --> @dm:hist
            timestamp = "sys_created_on" &
            interval = "30d"
    --> @dm:change-time-format
            columns = "sys_created_on" &
            from_format = "ns" &
            to_format = "%Y-%m"

Output:

sys_created_on count
2020-06 1
2020-07 0
2020-08 0
2020-09 0
2020-10 0
2020-11 16
2020-12 15
2021-01 0
2021-02 9
Computing Time Series Data Histograms with GroupBy

Instead of counting number of tickets opened by month, let us count number tickets opened by month but grouped by priority of the incident. We will use the bot @dm:hist-groupby

Code Snippet: Histogram with GroupBy
@files:loadfile
            filename = 'https://bot-docs.cloudfabrix.io/data/datasets/sample-servicenow-incidents.csv'
    --> *dm:filter
            sys_created_on after '2020-01-01 00:00:00'
            GET sys_created_on, priority
    --> @dm:hist-groupby
            timestamp = "sys_created_on" &
            interval = "30d" &
            groupby = "priority" & 
            align = "no"
    --> @dm:change-time-format
            columns = "sys_created_on" &
            from_format = "ns" &
            to_format = "%Y-%m"

Outpuut:

sys_created_on count priority
2020-11 10 1 - Critical
2020-12 0 1 - Critical
2021-01 5 1 - Critical
2020-11 3 2 - High
2020-12 0 2 - High
2021-01 1 2 - High
2020-11 4 3 - Moderate
2020-12 0 3 - Moderate
2021-01 2 3 - Moderate
2020-12 2 4 - Low
2020-06 1 5 - Planning
2020-07 0 5 - Planning
2020-08 0 5 - Planning
2020-09 0 5 - Planning
2020-10 0 5 - Planning
2020-11 3 5 - Planning
2020-12 9 5 - Planning
2021-01 0 5 - Planning
2021-02 1 5 - Planning