How to Publish Data Collector Tables to Apache Kafka

Posted August 10, 2017 by Soniya Shah, Information Developer

Business Team Meeting Discussion Working Concept
This blog post was authored by Serge Bonte.

You are probably familiar with the Vertica Data Collector (DC) and have used the granular information it collects to monitor and optimize Vertica deployments. A common challenge is that Data Collector keeps only a portion of that information—controlled by retention policies —in the internal DC tables before flushing it to log files.

Apache Kafka is well suited for moving large numbers of small messages. Vertica Management Console (MC) can move DC data via Kafka to another Vertica instance. If you are not using MC or not able to create another dedicated Vertica instance for extended DC storage, this blog post describes how to publish the DC data to Kafka. After the data is in Kafka, you can leverage it directly from Kafka or use it to feed another system where you might be monitoring all its systems (not just Vertica).

Data Collector and Management Console’s Extended Monitoring

Management Console provides extended monitoring for extending the Data Collector storage and for monitoring and analyzing historical data. As Vertica writes information to the Data Collector tables, Vertica produces matching Kafka data messages (one topic per table). A separate Vertica instance consumes these Kafka messages and stores them in a local schema that replicates the DC tables. These extended tables are then available for reviewing and analyzing database performance, either through the Management Console or through access to that additional Vertica instance.

If you are not using the Management Console or you don’t have a spare Vertica instance to extend the DC storage, you can still leverage the built-in mechanisms to publish DC tables to Kafka. Let’s explore how.

Vertica Notifiers

A Vertica notifier is a built-in push-based mechanism that sends messages from Vertica to end points such as Kafka. Use this mechanism if you don’t want to use Management Console’s extending monitoring or you do not have another Vertica instance for extended storage.

Create a notifier using the CREATE NOTIFIER SQL statement. For example, the following statement creates a notifier named my_notifier.

• The ACTION parameter identifies the target Kafka broker.
• The MAXMEMORYSIZE parameter defines the size of the notification queue.
• The IDENTIFIED BY parameter is the unique identifier of the notifier. If set, all messages published by this notifier will contain this attribute.

In this statement, the notifier publishes data to a Kafka broker running on 127.0.0.1 and on port 9092, with a notification queue size of 10M. This statement also sets specific parameters for communicating with Kafka. => CREATE NOTIFIER my_notifier ACTION 'kafka://127.0.0.1:9092' MAXMEMORYSIZE '10M' PARAMETERS 'queue.buffering.max.ms=1000'; The Vertica NOTIFY function allows you to publish ad hoc messages. For example, at the end of a long-running query, you might configure Vertica to send a message when the query completes. In the next example statement, the message ‘Long Query Done’ is published for the topic LongQuery_topic to the Kafka broker that you specified when creating my_notifier. => SELECT NOTIFY('Long Query Done', 'my_notifier', 'LongQuery_topic'); Vertica also allows you to publish Data Collector information automatically by setting a notification policy for any of the 90+ Data Collector components. To do this, you map a DC component to a Kafka topic and to a notifier.

In the following example, the query enables a notification policy for the LoadEvents component using the dc_notify notifier and the dc_load_events_topic Kafka topic. => SELECT SET_DATA_COLLECTOR_NOTIFY_POLICY('LoadEvents', 'dc_notify', 'dc_load_events_topic',1);

An Example: Capturing the Number of Catalog Objects

Suppose you want to monitor the CatalogInfoByMinute Data Collector component using Kafka. The CatalogInfoByMinute component captures the number of catalog objects every minute for each node. Here are the steps you need to take:

1. Create a notifier: => CREATE NOTIFIER dc_notify ACTION 'Kafka://kafkahost:9092' MAXMEMORYSIZE '1G' IDENTIFIED BY 'dc_notify_example'; 2. Enable the notification policy for the CatalogInfoByMinute component: => SELECT SET_DATA_COLLECTOR_NOTIFY_POLICY('CatalogInfoByMinute', 'dc_notify', 'dc_catalog_info_by_minute_topic',1); 3. Verify that the Kafka messages are published.

Here’s a sample message read by a Kafka consumer for the dc_catalog_info_by_minute_topic topic. Note that the messages are in JSON format: {"_db":"sandbox","_schema":"v_internal","_table":"dc_catalog_info_by_minute", "_uuid":"dc_notify_example","ahm_epoch_end_value":156084,"ahm_epoch_peak_delta":0, "ahm_epoch_peak_end":"2017-05-15 23:59:00.00864+00","ahm_epoch_peak_start":"2017-05-15 23:58:59.003702+00","ahm_epoch_start_value":156084,"ahm_timestamp_end_value":"2017-05-15 20:55:00.207907+00","ahm_timestamp_start_value":"2017-05-15 20:55:00.207907+00","catalog_id":"shared","catalog_version_end_value":168922, "catalog_version_peak_delta":0,"catalog_version_peak_end":"2017-05-15 23:59:00.00864+00","catalog_version_peak_start":"2017-05-15 23:58:59.003702+00","catalog_version_start_value":168922," checkpoint_epoch_end_value":156084,"checkpoint_epoch_peak_delta":0, "checkpoint_epoch_peak_end":"2017-05-15 23:59:00.00864+00","checkpoint_epoch_peak_start":"2017-05-15 23:58:59.003702+00","checkpoint_epoch_start_value":156084,"current_epoch_end_value":156085, "current_epoch_peak_delta":0,"current_epoch_peak_end":"2017-05-15 23:59:00.00864+00","current_epoch_peak_start":"2017-05-15 23:58:59.003702+00","current_epoch_start_value":156085,"end_time":"2017-05-15 23:59:00.00864+00","first_epoch_end_value":0,"first_epoch_peak_delta":0, "first_epoch_peak_end":"2017-05-15 23:59:00.00864+00","first_epoch_peak_start":"2017-05-15 23:58:59.003702+00","first_epoch_start_value":0,"k_end_value":1,"k_start_value":1, "node_name":"v_sandbox_node0002","object_count_end_value":11152,"object_count_max_value":11152, "object_count_min_value":11152,"object_count_sample_count":60, "object_count_sample_sum":669120,"object_count_start_value":11152,"start_time":"2017-05-15 23:58:00.00518+00","time":"2017-05-15 23:59:00.00871+00"} After they have been published, Kafka messages can be consumed by any monitoring or storage system. Using the Kafka messages from the CatalogInfoByMinute DC component, you can monitor the average number of catalog objects among the Vertica nodes.

Here’s a visualization from a simple node.js monitoring application built using socket.io and Kafka-rest. This graph shows that

• The number of catalog objects are within the same ranges in all nodes.
• For each node, the number of catalog objects regularly grows and contracts.
The graph also indicates any abnormal behavior.

For More Information

For additional information, see the following topics in the Vertica documentation:

Extended Monitoring
Integrating with Kafka