How To Log Databricks Workflows with the Elastic (ELK) Stack | by Yury Kalbaska | Jul, 2024

A sensible instance of establishing observability for an information pipeline utilizing greatest practices from SWE world

Logging databricks workflows to elasticsearch
Photograph by ThisisEngineering on Unsplash

On the time of this writing (July 2024) Databricks has turn into a regular platform for information engineering within the cloud, this rise to prominence highlights the significance of options that help sturdy information operations (DataOps). Amongst these options, observability capabilities — logging, monitoring, and alerting — are important for a mature and production-ready information engineering software.

There are a lot of instruments to log, monitor, and alert the Databricks workflows together with built-in native Databricks Dashboards, Azure Monitor, DataDog amongst others.

Nonetheless, one widespread state of affairs that isn’t clearly coated by the above is the necessity to combine with an current enterprise monitoring and alerting stack somewhat than utilizing the devoted instruments talked about above. Most of the time, this might be Elastic stack (aka ELK) — a de-facto commonplace for logging and monitoring within the software program improvement world.

Elements of the ELK stack?

ELK stands for Elasticsearch, Logstash, and Kibana — three merchandise from Elastic that provide end-to-end observability resolution:

  1. Elasticsearch — for log storage and retrieval
  2. Logstash — for log ingestion
  3. Kibana — for visualizations and alerting

The next sections will current a sensible instance of the way to combine the ELK Stack with Databricks to attain a sturdy end-to-end observability resolution.

Stipulations

Earlier than we transfer on to implementation, guarantee the next is in place:

  1. Elastic cluster — A working Elastic cluster is required. For less complicated use circumstances, this could be a single-node setup. Nonetheless, one of many key benefits of the ELK is that it’s absolutely distributed so in a bigger group you’ll most likely take care of a cluster working in Kubernetes. Alternatively, an occasion of Elastic Cloud can be utilized, which is equal for the needs of this instance.
    If you’re experimenting, discuss with the glorious information by DigitalOcean on the way to deploy an Elastic cluster to a neighborhood (or cloud) VM.
  2. Databricks workspace — guarantee you might have permissions to configure cluster-scoped init scripts. Administrator rights are required for those who intend to arrange world init scripts.

Storage

For log storage, we are going to use Elasticsearch’s personal storage capabilities. We begin by establishing. In Elasticsearch information is organized in indices. Every index comprises a number of paperwork, that are JSON-formatted information constructions. Earlier than storing logs, an index have to be created. This job is typically dealt with by a company’s infrastructure or operations staff, but when not, it may be achieved with the next command:

curl -X PUT "http://localhost:9200/logs_index?fairly"

Additional customization of the index may be completed as wanted. For detailed configuration choices, discuss with the REST API Reference: https://www.elastic.co/information/en/elasticsearch/reference/present/indices-create-index.html

As soon as the index is ready up paperwork may be added with:

curl -X POST "http://localhost:9200/logs_index/_doc?fairly"
-H 'Content material-Sort: software/json'
-d'
{
"timestamp": "2024-07-21T12:00:00",
"log_level": "INFO",
"message": "It is a log message."
}'

To retrieve paperwork, use:

curl -X GET "http://localhost:9200/logs_index/_search?fairly"
-H 'Content material-Sort: software/json'
-d'
{
"question": {
"match": {
"message": "It is a log message."
}
}
}'

This covers the important performance of Elasticsearch for our functions. Subsequent, we are going to arrange the log ingestion course of.

Transport / Ingestion

Within the ELK stack, Logstash is the element that’s liable for ingesting logs into Elasticsearch.

The performance of Logstash is organized into pipelines, which handle the circulate of information from ingestion to output.

Every pipeline can encompass three predominant levels:

  1. Enter: Logstash can ingest information from numerous sources. On this instance, we are going to use Filebeat, a light-weight shipper, as our enter supply to gather and ahead log information — extra on this later.
  2. Filter: This stage processes the incoming information. Whereas Logstash helps numerous filters for parsing and remodeling logs, we won’t be implementing any filters on this state of affairs.
  3. Output: The ultimate stage sends the processed information to a number of locations. Right here, the output vacation spot might be an Elasticsearch cluster.

Pipeline configurations are outlined in YAML information and saved within the /and so on/logstash/conf.d/ listing. Upon beginning the Logstash service, these configuration information are mechanically loaded and executed.

You’ll be able to discuss with Logstash documentation on the way to arrange one. An instance of a minimal pipeline configuration is offered under:

enter {
beats {
port => 5044
}
}

filter {}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "filebeat-logs-%{+YYYY.MM.dd}"
}
}

Lastly, make sure the configuration is right:

bin/logstash -f /and so on/logstash/conf.d/take a look at.conf --config.test_and_exit

Amassing software logs

There’s another element in ELK — Beats. Beats are light-weight brokers (shippers) which might be used to ship log (and different) information into both Logstash or Elasticsearch straight. There’s numerous Beats — every for its particular person use case however we’ll think about Filebeat — by far the most well-liked one — which is used to gather log information, course of them, and push to Logstash or Elasticsearch straight.

Beats have to be put in on the machines the place logs are generated. In Databricks we’ll have to setup Filebeat on each cluster that we wish to log from — both All-Function (for prototyping, debugging in notebooks and comparable) or Job (for precise workloads). Putting in Filebeat includes three steps:

  1. Set up itself — obtain and execute distributable package deal in your working system (Databricks clusters are working Ubuntu — so a Debian package deal must be used)
  2. Configure the put in occasion
  3. Beginning the service by way of system.d and asserting it’s energetic standing

This may be achieved with the assistance of Init scripts. A minimal instance Init script is recommended under:

#!/bin/bash

# Verify if the script is run as root
if [ "$EUID" -ne 0 ]; then
echo "Please run as root"
exit 1
fi

# Obtain filebeat set up package deal
SRC_URL="https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.14.3-amd64.deb"
DEST_DIR="/tmp"
FILENAME=$(basename "$SRC_URL")
wget -q -O "$DEST_DIR/$FILENAME" "$SRC_URL"

# Set up filebeat
export DEBIAN_FRONTEND=noninteractive
dpkg -i /tmp/filebeat-8.14.3-amd64.deb
apt-get -f set up -y

# Configure filebeat
cp /and so on/filebeat/filebeat.yml /and so on/filebeat/filebeat_backup.yml
tee /and so on/filebeat/filebeat.yml > /dev/null <<EOL
filebeat.inputs:
- kind: filestream
id: my-application-filestream-001
enabled: true
paths:
- /var/log/myapplication/*.txt
parsers:
- ndjson:
keys_under_root: true
overwrite_keys: true
add_error_key: true
expand_keys: true

processors:
- timestamp:
area: timestamp
layouts:
- "2006-01-02T15:04:05Z"
- "2006-01-02T15:04:05.0Z"
- "2006-01-02T15:04:05.00Z"
- "2006-01-02T15:04:05.000Z"
- "2006-01-02T15:04:05.0000Z"
- "2006-01-02T15:04:05.00000Z"
- "2006-01-02T15:04:05.000000Z"
take a look at:
- "2024-07-19T09:45:20.754Z"
- "2024-07-19T09:40:26.701Z"

output.logstash:
hosts: ["localhost:5044"]

logging:
degree: debug
to_files: true
information:
path: /var/log/filebeat
identify: filebeat
keepfiles: 7
permissions: 0644
EOL

# Begin filebeat service
systemctl begin filebeat

# Confirm standing
# systemctl standing filebeat

Timestamp Concern

Discover how within the configuration above we arrange a processor to extract timestamps. That is completed to deal with a standard downside with Filebeat — by default it would populate logs @timestamp area with a timestamp when logs have been harvested from the designated listing — not with the timestamp of the particular occasion. Though the distinction isn’t greater than 2–3 seconds for lots of functions, this could mess up the logs actual dangerous — extra particularly, it may well mess up the order of information as they’re coming in.

To handle this, we are going to overwrite the default @timestamp area with values from log themselves.

Logging

As soon as Filebeat is put in and working, it would mechanically acquire all logs output to the designated listing, forwarding them to Logstash and subsequently down the pipeline.

Earlier than this could happen, we have to configure the Python logging library.

The primary crucial modification can be to arrange FileHandler to output logs as information to the designated listing. Default logging FileHandler will work simply high quality.

Then we have to format the logs into NDJSON, which is required for correct parsing by Filebeat. Since this format just isn’t natively supported by the usual Python library, we might want to implement a customized Formatter.

class NDJSONFormatter(logging.Formatter):
def __init__(self, extra_fields=None):
tremendous().__init__()
self.extra_fields = extra_fields if extra_fields just isn't None else {}

def format(self, file):
log_record = {
"timestamp": datetime.datetime.fromtimestamp(file.created).isoformat() + 'Z',
"log.degree": file.levelname.decrease(),
"message": file.getMessage(),
"logger.identify": file.identify,
"path": file.pathname,
"lineno": file.lineno,
"operate": file.funcName,
"pid": file.course of,
}
log_record = {**log_record, **self.extra_fields}
if file.exc_info:
log_record["exception"] = self.formatException(file.exc_info)
return json.dumps(log_record)

We can even use the customized Formatter to deal with the timestamp difficulty we mentioned earlier. Within the configuration above a brand new area timestamp is added to the LogRecord object that may conatain a duplicate of the occasion timestamp. This area could also be utilized in timestamp processor in Filebeat to interchange the precise @timestamp area within the revealed logs.

We are able to additionally use the Formatter so as to add additional fields — which can be helpful for distinguishing logs in case your group makes use of one index to gather logs from a number of functions.

Further modifications may be made as per your necessities. As soon as the Logger has been arrange we are able to use the usual Python logging API — .data() and .debug(), to write down logs to the log file and they’re going to mechanically propagate to Filebeat, then to Logstash, then to Elasticsearch and eventually we can entry these in Kibana (or some other consumer of our alternative).

Visualization

Within the ELK stack, Kibana is a element liable for visualizing the logs (or some other). For the aim of this instance, we’ll simply use it as a glorified search consumer for Elasticsearch. It could nonetheless (and is meant to) be arrange as a full-featured monitoring and alerting resolution given its wealthy information presentation toolset.

As a way to lastly see our log information in Kibana, we have to arrange Index Patterns:

  1. Navigate to Kibana.
  2. Open the “Burger Menu” (≡).
  3. Go to Administration -> Stack Administration -> Kibana -> Index Patterns.
  4. Click on on Create Index Sample.
Kibana index sample creation interfact

Kibana will helpfully recommend names of the accessible sources for the Index Patterns. Sort out a reputation that may seize the names of the sources. On this instance it may be e.g. filebeat*, then click on Create index sample.

As soon as chosen, proceed to Uncover menu, choose the newly created index sample on the left drop-down menu, modify time interval (a standard pitfall — it’s set as much as final quarter-hour by default) and begin with your personal first KQL question to retrieve the logs.

Log stream visualized in Kibana

Now we have now efficiently accomplished the multi-step journey from producing a log entry in a Python software hosted on Databricks to to visualizing and monitoring this information utilizing a consumer interface.

Whereas this text has coated the introductory facets of establishing a sturdy logging and monitoring resolution utilizing the ELK Stack together with Databricks, there are further issues and superior matters that recommend additional exploration:

  • Selecting Between Logstash and Direct Ingestion: Evaluating whether or not to make use of Logstash for added information processing capabilities versus straight forwarding logs from Filebeat to Elasticsearch.
  • Schema Issues: Deciding on the adoption of the Elastic Frequent Schema (ECS) versus implementing customized area constructions for log information.
  • Exploring Various Options: Investigating different instruments resembling Azure EventHubs and different potential log shippers that will higher match particular use circumstances.
  • Broadening the Scope: Extending these practices to embody different information engineering instruments and platforms, making certain complete observability throughout your entire information pipeline.

These matters might be explored in additional articles.

Except in any other case famous, all pictures are by the writer.