Guide to Data In Motion in RDA Fabric
RDA Pipelines can exchange data even if they are running at different locations, as long as they all connected to same instance of RDA Fabric.
RDA Fabric uses RDA Streams Using NATS for control path as well as as the default mechanism for data path. Any other streaming approach discussed in section 3.2 can also be used for data path. Multiple streaming approaches may be used at the same time.
1. RDA Streams
- Uses NATS Publish-Subscribe to move the data
- Each Stream name is translated into a NATS Subject.
- Any Stream which is added to list of Persistent Streams it is automatically persisted in Opensearch.
- Each subscriber can use a "group" parameter to identify consumer group. If multiple subscribers use same group name, only one of them will receive that specific batch of data.
- Data can be routed to multiple subscribers by using a different group name for each of the subscribers.
- Stream name
NULL
(case-insensitive), implies no data will be written or read from that stream. - RDA Streams do not have to be created in RDAF. First time a data is published or subscribed, streams come into existence.
- List of RDA Streams used by RDA Fabric for it's own operations:
Stream Name Description rda_system_feature_usage RDA Portal and Studio capture all usage metrics by feature.
This report is accessible via left side menu Analyticsrda_system_deployment_updates Audit trail of changes to Service Blueprints rda_system_gw_endpoint_metrics RDA Event Gateway data ingestion metrics by ingestion endpoint rda_system_worker_trace_summary Pipeline execution statistics by Site, Bots etc. rda_worker_resource_usage Resource usage metrics published by RDA Workers rda_system_gw_log_archival_updates Log Archive metrics by RDA Event Gateway rda_system_log_replays Audit trail of Log Archive replays rda_system_worker_traces Detailed execution traces for each pipeline, published by RDA Workers
All the above RDA Streams are also Persistent Streams
- Example RDA Service Map showing data flow between pipelines using RDA Streams:
Related Bots
Related RDA Client CLI Commands
read-stream Read messages from an RDA stream
read-stream-ack Read messages from an RDA Stream with acknowledgements enabled
write-stream Write data to the specified stream
write-stream-ack Write data to the specified stream with acknowledgement enabled
2. Bots to Manage Data Inflight
This section provides details about various bots to cleanup, re-shape, aggregate, enrich, mask, extract data as it progresses through the pipeline. RDA bots also can be used verify integrity of data to ensure it meets certain criteria.
2.1. Data Cleanup
Data cleanup means getting rid of unnecessary rows & columns, fixing the values, reformatting values on as needed basis.
Following are some of the bots that can be used for cleaning up of the data:
Bot Description @dm:dedup Deduplicates data by using specified column values @dm:dropnull Drop rows if specified column have null values @dm:map Map values from one column to another, using optional mapping function and arguments @dm:map-multi-proc Same as @dm:map, but uses all available CPU core to do parallel processing @dm:eval Map values using evaluate function. Specify one of more column = 'expression' pairs @dm:eval-multi-proc Same as @dm:eval, but uses all available CPU core to do parallel processing *dm:filter Applies a CFXQL filter on input dataframe and returns rows that matches the filter *dm:time-filter Filter rows using one of the timestamp based column for a specific time range @#dm:filter-using-dict Applies a CFXQL filters specified in separate dictionary (saved dataset) and returns rows that matches the filters @dm:add-missing-columns Add columns to dataframe if they are not found. @dm:drop-null-columns Drop specified columns if they have certain percentage of null values @dm:mergecolumns Merge columns using 'include' regex and/or 'exclude' regex into a 'to' column @dm:fixcolumns Fix column names such that they contain only allowed characters @dm:fixnull Replace null values in a comma separated column list @dm:fixnull-regex Replace null values in all columns that match the specified regular expression @dm:rename-columns Rename specified column names using new_column_name = 'old_column_name' format @dm:replace-data Replace data using regex pattern from list of comma separated columns @dm:selectcolumns Select columns using 'include' regex and/or 'exclude' regex @dm:to-type Change data type to str or int or float for specified columns @dm:change-time-format Change timestamp from one format to another for all specified columns
2.2 Data Re-shaping
RDA cfxdm extension provides many bots to re-shape inflight data.
Row Based Re-shaping
Following are some of the bots that can do row based re-shaping of inflight data:
Bot Description @dm:empty Creates an empty dataframe.
If used as Sink bot, it empties the previous dataframe@dm:addrow Adds a new row with specified columns and fixed values @dm:head Keep first n rows of the dataframe and discard the rest @dm:tail Keep last n rows of the dataframe and discard the rest @dm:sample Randomly select rows from input.
Can be used to select subset of rows or replicate rows randomly to create more rows@dm:sort Sort rows in ascending or descending order using specified column values @dm:concat Loads one or more saved datasets, concatenates them and returns a single dataframe @dm:dedup Deduplicates data by using specified column values @dm:dropnull Drop rows if specified column have null values @dm:explode From a specified column value, split it using a seperator,
replicate the splitted value into multiple rows.
It keeps rest of the columns in-tact@dm:implode Merge values from different rows into single value for that column @dm:explode-json From a specified column value, split it as if it a JSON dict or list
replicate the splitted value into multiple rows.
It keeps rest of the columns in-tact*dm:filter Applies a CFXQL filter on input dataframe and returns rows that matches the filter *dm:time-filter Filter rows using one of the timestamp based column for a specific time range @#dm:filter-using-dict Applies a CFXQL filters specified in separate dictionary (saved dataset) and returns rows that matches the filters
Column Based Re-shaping
Following are some of the bots that can do column based re-shaping of inflight data:
Bot Description @dm:map Map values from one column to another, using optional mapping function and arguments @dm:map-multi-proc Same as @dm:map, but uses all available CPU core to do parallel processing @dm:eval Map values using evaluate function. Specify one of more column = 'expression' pairs @dm:eval-multi-proc Same as @dm:eval, but uses all available CPU core to do parallel processing @dm:add-missing-columns Add columns to dataframe if they are not found. @dm:drop-null-columns Drop specified columns if they have certain percentage of null values @dm:fixcolumns Fix column names such that they contain only allowed characters @dm:mergecolumns Merge columns using 'include' regex and/or 'exclude' regex into a 'to' column @dm:rename-columns Rename specified column names using new_column_name = 'old_column_name' format @dm:selectcolumns Select columns using 'include' regex and/or 'exclude' regex @dm:transpose Transposes columns to rows
2.3 Data Extraction
Following bots can be used to extract data from unstructured logs & events
Bot Description @dm:eval Simple parsing & spitting by a word can be accomplished using this bot @dm:eval-multi-proc Simple parsing & spitting by a word can be accomplished using this bot @dm:extract Extract data using Python Named Capturing Groups in Regular Expressions @dm:grok Extract using Grok Patterns. RDA Supported Grok Patterns @dm:grok-multi-proc Extract using Grok Patterns. RDA Supported Grok Patterns. Uses multiple processes to accomplish parallel processing of data
Example Pipeline snippets for Grok Parsing
Playground
Output for the above example
Column Value logsource cfx-rda-worker-vm01 priority facility timestamp Mar 21 03:22:17 program sshd message pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.101.101 user=macaw pid 414991 timestamp8601 meta_grok_message
2.4 Data Enrichment
Following bots can be used to enrich inflight data with other datasets:
Bot Description @dm:enrich Enrich the input dataframe using a saved dictionary dataset @dm:enrich-using-ip-cidr Enrich the input dataframe using a saved dictionary dataset.
Match IP address in input dataframe with CIDRs specified in the dictionary@dm:enrich-using-ip-cidr-multi-proc Same as @dm:enrich-using-ip-cidr, but uses all available processors @dm:enrich-using-rule-dict Enrich using rule based dictionary which contains 'rule' column @dm:dns-ip-to-name Perform reverse DNS lookup to map IP Addresses to Hostnames on specified columns @dm:dns-name-to-ip Perform DNS lookup to map Hostnames to IP Addresses on specified columns
Enrichment using Geo Location Mapping
If the data contains IP Addresses (IPv4 or IPV6), following bot can be used to enrich with geographical location information:
Bot Description @geomap:lookup Enrich the input dataframe using Geo IP mapping
Playground
Output for the above example
Column Value geo_accuracy_radius 1000.0 geo_autonomous_system_number 13335.0 geo_autonomous_system_organization CLOUDFLARENET geo_city_name Fremont geo_continent_code geo_continent_name North America geo_country_iso_code US geo_country_name United States geo_is_in_european_union 0 geo_is_satellite_provider 0 geo_latitude 37.562 geo_locale_code en geo_longitude -122.0 geo_metro_code 807.0 geo_network 104.28.0.0/15 geo_postal_code 94536 geo_status found geo_subdivision_1_iso_code CA geo_subdivision_1_name California geo_subdivision_2_iso_code geo_subdivision_2_name geo_time_zone America/Los_Angeles ip 104.28.124.67
Enrichment using NLP Techniques
If the data contains natural language text, following NLP techniques can be used to enrich the data:
- Keyword Extraction
- Named Entity Recognition
- Summarization
- Sentiment Analysis
Refer to Natural Language Processing (NLP) in RDA for details on how to accomplish this in RDA.
2.5 Data Integrity
RDA Fabric provides several ways to ensure integrity of the data moving through the fabric.
Following is a brief summary of different ways RDA Pipelines can check for data integrity and possibly take action:
Integrity Check Type | Description |
---|---|
Using Checksum | Using bots @dm:add-checksum and @dm:verify-checksum, it can be ensured that data has not been tampered between an origin and destination. |
Basic Column Checks | Using bot @dm:check-columns pipeline can look for existence of specific columns and take action on the data. If the intent is to cleanup the data based on amount of nulls in the data, bots @dm:drop-null-columns, @dm:dropnull, @dm:fixnull, @dm:fixnull-regex can be used. |
Validing Against Schema | RDA can use JSON Schema to validate any data moving through the fabric. Bot @dm:validate-data validates any incoming data against the specified schema. Optionally, RDA Datasets can be bound to a specific JSON Schema, and RDA validates the model everytime dataset is saved or updated. |
Validating Using Rules Dictionary | Bot @dm:check-integrity uses rules dictionary to validate the data and take an action. Each rule would typically specify expected data types for certain set of columns, nulls allowed or not, range of values allowed for numericals, regex patterns for string data types. Bot @dm:filter-using-dict can be used to drop specific rows based on criteria. Bot @dm:enrich-using-rule-dict can be used to check for very complex data types, relationships and tag each row for further action. |
2.6 Data Masking
RDA provides couple of ways to mask the data. First one using @dm:mask.
Example pipeline
Playground
Output for the pipeline would be:
Column Value joh**************
phone_number *********565
ssn *******8008
Certain types of masking can also be done using @dm:eval. Eval allows more complex types of masking. For example, let us say we want to keep first 3 digits of phone number and last 3 digits intact, we can do this way:
Playground
Output for this pipeline would be
Column Value ***n.doe@xxxxx.yyy
phone_number 925*-***-*565
ssn *****8008
2.7 Data Aggregations
RDA cfxdm exention provides many bots that can be used for data aggregations.
Computing Time Series Data Histograms
Historgrams convert timeseries data such as logs & metrics into 'events per time-interval'. Let us use sample-servicenow-incidents dataset and use Service Now incidents as our input dataset. Let us how count how many tickets been opened by month since Jan 1, 2020. Let
Code Snippet: Histogram
Following snippet does:
- Download data from a URL
- Filter the data to retain incidents opened since Jan 1, 2020. We will just retain
sys_created_on
column for this purpose - Use @dm:hist bot to compite histogram
- Change the timestamp format for
sys_created_on
to show year & month only.
Playground
Output:
sys_created_on | count |
---|---|
2020-06 | 1 |
2020-07 | 0 |
2020-08 | 0 |
2020-09 | 0 |
2020-10 | 0 |
2020-11 | 16 |
2020-12 | 15 |
2021-01 | 0 |
2021-02 | 9 |
Computing Time Series Data Histograms with GroupBy
Instead of counting number of tickets opened by month, let us count number tickets opened by month but grouped by priority
of the incident. We will use the bot @dm:hist-groupby
Code Snippet: Histogram with GroupBy
Playground
Outpuut:
sys_created_on | count | priority |
---|---|---|
2020-11 | 10 | 1 - Critical |
2020-12 | 0 | 1 - Critical |
2021-01 | 5 | 1 - Critical |
2020-11 | 3 | 2 - High |
2020-12 | 0 | 2 - High |
2021-01 | 1 | 2 - High |
2020-11 | 4 | 3 - Moderate |
2020-12 | 0 | 3 - Moderate |
2021-01 | 2 | 3 - Moderate |
2020-12 | 2 | 4 - Low |
2020-06 | 1 | 5 - Planning |
2020-07 | 0 | 5 - Planning |
2020-08 | 0 | 5 - Planning |
2020-09 | 0 | 5 - Planning |
2020-10 | 0 | 5 - Planning |
2020-11 | 3 | 5 - Planning |
2020-12 | 9 | 5 - Planning |
2021-01 | 0 | 5 - Planning |
2021-02 | 1 | 5 - Planning |