Adopting Spark Join. How we use a shared Spark server to… | by Sergey Kotlov | Nov, 2024

How we use a shared Spark server to make our Spark infrastructure extra environment friendly

Picture by Kanenori from Pixabay

Spark Join is a comparatively new element within the Spark ecosystem that permits skinny shoppers to run Spark purposes on a distant Spark cluster. This expertise can provide some advantages to Spark purposes that use the DataFrame API. Spark has lengthy allowed to run SQL queries on a distant Thrift JDBC server. Nonetheless, this potential to remotely run shopper purposes written in any supported language (Scala, Python) appeared solely in Spark 3.4.

On this article, I’ll share our expertise utilizing Spark Join (model 3.5). I’ll speak about the advantages we gained, technical particulars associated to working Spark shopper purposes, and a few recommendations on the right way to make your Spark Join setup extra environment friendly and steady.

Spark is without doubt one of the key parts of the analytics platform at Joom. We now have a lot of inside customers and over 1000 customized Spark purposes. These purposes run at completely different occasions of day, have completely different complexity, and require very completely different quantities of computing assets (starting from just a few cores for a few minutes to over 250 cores for a number of days). Beforehand, all of them had been all the time executed as separate Spark purposes (with their very own driver and executors), which, within the case of small and medium-sized purposes (we traditionally have many such purposes), led to noticeable overhead. With the introduction of Spark Join, it’s now potential to arrange a shared Spark Join server and run many Spark shopper purposes on it. Technically, the Spark Join server is a Spark utility with an embedded Spark Join endpoint.

Picture by creator

Listed here are the advantages we had been in a position to get from this:

  • Useful resource financial savings
    – When working through Spark Join, shopper purposes don’t require their very own Spark driver (which generally makes use of over 1.5 GB of reminiscence). As an alternative, they use a skinny shopper with a typical reminiscence consumption of 200 MB.
    – Executor utilization improves since any executor can run the duties of a number of shopper purposes. For instance, suppose some Spark utility, in some unspecified time in the future in its execution, begins utilizing considerably fewer cores and reminiscence than initially requested. There are various the explanation why this could occur. Then, within the case of a separate Spark utility, at the moment unused assets are sometimes wasted since dynamic allocation usually doesn’t present environment friendly scale-down. Nonetheless, with the Spark Join server, the freed-up cores and reminiscence can instantly be used to run duties of different shopper purposes.
  • Decreased startup wait time
    – For numerous causes, we now have to restrict the variety of concurrently working separate Spark purposes, they usually might wait within the queue for fairly a very long time if all slots are at the moment occupied. It might negatively have an effect on knowledge readiness time and consumer expertise. Within the case of the Spark Join server, we now have to this point been in a position to keep away from such limitations, and all Spark Join shopper purposes begin working instantly after launch.
    – For ad-hoc executions, it’s fascinating to reduce the time to get outcomes as a lot as potential and keep away from retaining individuals ready. Within the case of separate Spark purposes, launching a shopper utility usually requires provisioning further EC2 nodes for its driver and executors, in addition to initializing the driving force and executors. All of this collectively can take greater than 4 minutes. Within the case of the Spark Join server, a minimum of its driver is all the time up and able to settle for requests, so it is just a matter of ready for extra executors, and infrequently executors are already out there. This will likely considerably scale back the wait time for ad-hoc purposes to be prepared.

Our constraints

In the mean time, we don’t run long-running heavy purposes on Spark Join for the next causes:

  • They might trigger failure or unstable habits of the Spark Join server (e.g., by overflowing disks on executor nodes). It might result in large-scale issues for all the platform.
  • They usually require distinctive reminiscence settings and use particular optimization strategies (e.g., customized extraStrategies).
  • We at the moment have an issue with giving the Spark Join server loads of executors to deal with a really massive simultaneous load (that is associated to the habits of Spark Activity Scheduler and is past the scope of this text).

Subsequently, heavy purposes nonetheless run as separate Spark purposes.

We use Spark on Kubernetes/EKS and Airflow. Some code examples shall be particular to this setting.

We now have too many alternative, continuously altering Spark purposes, and it could take an excessive amount of time to manually decide for each whether or not it ought to run on Spark Join in line with our standards or not. Moreover, the checklist of purposes working on Spark Join must be up to date frequently. For instance, suppose in the present day, some utility is gentle sufficient, so we now have determined to run it on Spark Join. However tomorrow, its builders might add a number of massive joins, making it fairly heavy. Then, will probably be preferable to run it as a separate Spark utility. The reverse state of affairs can also be potential.

Ultimately, we created a service to robotically decide the right way to launch every particular shopper utility. This service analyzes the historical past of earlier runs for every utility, evaluating such metrics as Complete Activity Time, Shuffle Write, Disk Spill, and others (this knowledge is collected utilizing SparkListener). Customized parameters set for the purposes by builders (e.g., reminiscence settings of drivers and executors) are additionally thought-about. Primarily based on this knowledge, the service robotically determines for every utility whether or not it ought to be run this time on the Spark Join server or as a separate Spark utility. Thus, all our purposes ought to be able to run in both of the 2 methods.

In our surroundings, every shopper utility is constructed independently of the others and has its personal JAR file containing the appliance code, in addition to particular dependencies (for instance, ML purposes usually use third-party libraries like CatBoost and so forth). The issue is that the SparkSession API for Spark Join is considerably completely different from the SparkSession API used for separate Spark purposes (Spark Join shoppers use the spark-connect-client-jvm artifact). Subsequently, we’re purported to know on the construct time of every shopper utility whether or not it can run through Spark Join or not. However we have no idea that. The next describes our strategy to launching shopper purposes, which eliminates the necessity to construct and handle two variations of JAR artifact for a similar utility.

For every Spark shopper utility, we construct just one JAR file containing the appliance code and particular dependencies. This JAR is used each when working on Spark Join and when working as a separate Spark utility. Subsequently, these shopper JARs don’t comprise particular Spark dependencies. The suitable Spark dependencies (spark-core/spark-sql or spark-connect-client-jvm) shall be supplied later within the Java classpath, relying on the run mode. In any case, all shopper purposes use the identical Scala code to initialize SparkSession, which operates relying on the run mode. All shopper utility JARs are constructed for the common Spark API. So, within the a part of the code supposed for Spark Join shoppers, the SparkSession strategies particular to the Spark Join API (distant, addArtifact) are known as through reflection:

val sparkConnectUri: Possibility[String] = Possibility(System.getenv("SPARK_CONNECT_URI"))

val isSparkConnectMode: Boolean = sparkConnectUri.isDefined

def createSparkSession(): SparkSession = {
if (isSparkConnectMode) {
createRemoteSparkSession()
} else {
SparkSession.builder
// No matter you might want to do to configure SparkSession for a separate
// Spark utility.
.getOrCreate
}
}

personal def createRemoteSparkSession(): SparkSession = {
val uri = sparkConnectUri.getOrElse(throw new Exception(
"Required setting variable 'SPARK_CONNECT_URI' shouldn't be set."))

val builder = SparkSession.builder
// Reflection is used right here as a result of the common SparkSession API doesn't
// comprise these strategies. They're solely out there within the SparkSession API
// model for Spark Join.
classOf[SparkSession.Builder]
.getDeclaredMethod("distant", classOf[String])
.invoke(builder, uri)

// A set of identifiers for this utility (for use later).
val scAppId = s"spark-connect-${UUID.randomUUID()}"
val airflowTaskId = Possibility(System.getenv("AIRFLOW_TASK_ID"))
.getOrElse("unknown_airflow_task_id")
val session = builder
.config("spark.joom.scAppId", scAppId)
.config("spark.joom.airflowTaskId", airflowTaskId)
.getOrCreate()

// If the shopper utility makes use of your Scala code (e.g., customized UDFs),
// then you will need to add the jar artifact containing this code in order that it
// can be utilized on the Spark Join server facet.
val addArtifact = Possibility(System.getenv("ADD_ARTIFACT_TO_SC_SESSION"))
.forall(_.toBoolean)

if (addArtifact) {
val mainApplicationFilePath =
System.getenv("SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH")
classOf[SparkSession]
.getDeclaredMethod("addArtifact", classOf[String])
.invoke(session, mainApplicationFilePath)
}

Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
session.shut()
}
})

session
}

Within the case of Spark Join mode, this shopper code may be run as a daily Java utility anyplace. Since we use Kubernetes, this runs in a Docker container. All dependencies particular to Spark Join are packed right into a Docker picture used to run shopper purposes (a minimal instance of this picture may be discovered right here). The picture accommodates not solely the spark-connect-client-jvm artifact but additionally different widespread dependencies utilized by nearly all shopper purposes (e.g., hadoop-aws since we nearly all the time interact with S3 storage on the shopper facet).

FROM openjdk:11-jre-slim

WORKDIR /app

# Right here, we copy the widespread artifacts required for any of our Spark Join
# shoppers (primarily spark-connect-client-jvm, in addition to spark-hive,
# hadoop-aws, scala-library, and so forth.).
COPY construct/libs/* /app/

COPY src/fundamental/docker/entrypoint.sh /app/
RUN chmod +x ./entrypoint.sh
ENTRYPOINT ["./entrypoint.sh"]

This widespread Docker picture is used to run all our shopper purposes relating to working them through Spark Join. On the similar time, it doesn’t comprise shopper JARs with the code of explicit purposes and their dependencies as a result of there are lots of such purposes which might be continuously up to date and will rely on any third-party libraries. As an alternative, when a specific shopper utility is launched, the situation of its JAR file is handed utilizing an setting variable, and that JAR is downloaded throughout initialization in entrypoint.sh:

#!/bin/bash
set -eo pipefail

# This variable may even be used within the SparkSession builder inside
# the appliance code.
export SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH="/tmp/$(uuidgen).jar"

# Obtain the JAR with the code and particular dependencies of the shopper
# utility to be run. All such JAR recordsdata are saved in S3, and when
# making a shopper Pod, the trail to the required JAR is handed to it
# through setting variables.
java -cp "/app/*" com.joom.analytics.sc.shopper.S3Downloader
${MAIN_APPLICATION_FILE_S3_PATH} ${SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH}

# Launch the shopper utility. Any MAIN_CLASS initializes a SparkSession
# originally of its execution utilizing the code supplied above.
java -cp ${SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH}:"/app/*" ${MAIN_CLASS} "$@"

Lastly, when it comes time to launch the appliance, our customized SparkAirflowOperator robotically determines the execution mode (Spark Join or separate) primarily based on the statistics of earlier runs of this utility.

  • Within the case of Spark Join, we use KubernetesPodOperator to launch the shopper Pod of the appliance. KubernetesPodOperator takes as parameters the beforehand described Docker picture, in addition to the setting variables (MAIN_CLASS, JAR_PATH and others), which shall be out there to be used inside entrypoint.sh and the appliance code. There isn’t any have to allocate many assets to the shopper Pod (for instance, its typical consumption in our surroundings: reminiscence — 200 MB, vCPU — 0.15).
  • Within the case of a separate Spark utility, we use our customized AirflowOperator, which runs Spark purposes utilizing spark-on-k8s-operator and the official Spark Docker picture. Let’s skip the main points about our Spark AirflowOperator for now, as it’s a massive matter deserving a separate article.

Not all present Spark purposes may be efficiently executed on Spark Join since its SparkSession API is completely different from the SparkSession API used for separate Spark purposes. For instance, in case your code makes use of sparkSession.sparkContext or sparkSession.sessionState, it can fail within the Spark Join shopper as a result of the Spark Join model of SparkSession doesn’t have these properties.

In our case, the most typical reason behind issues was utilizing sparkSession.sessionState.catalog and sparkSession.sparkContext.hadoopConfiguration. In some circumstances, sparkSession.sessionState.catalog may be changed with sparkSession.catalog, however not all the time. sparkSession.sparkContext.hadoopConfiguration could also be wanted if the code executed on the shopper facet accommodates operations in your knowledge storage, similar to this:

def delete(path: Path, recursive: Boolean = true)
(implicit hadoopConfig: Configuration): Boolean = {
val fs = path.getFileSystem(hadoopConfig)
fs.delete(path, recursive)
}

Luckily, it’s potential to create a standalone SessionCatalog to be used inside the Spark Join shopper. On this case, the category path of the Spark Join shopper should additionally embrace org.apache.spark:spark-hive_2.12, in addition to libraries for interacting together with your storage (since we use S3, so in our case, it’s org.apache.hadoop:hadoop-aws).

import org.apache.spark.SparkConf
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.hive.StandaloneHiveExternalCatalog
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener, SessionCatalog}

// That is simply an instance of what the required properties may appear like.
// All of them ought to already be set for present Spark purposes in a single
// approach or one other, and their full checklist may be discovered within the UI of any
// working separate Spark utility on the Surroundings tab.
val sessionCatalogConfig = Map(
"spark.hadoop.hive.metastore.uris" -> "thrift://metastore.spark:9083",
"spark.sql.catalogImplementation" -> "hive",
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog",
)

val hadoopConfig = Map(
"hive.metastore.uris" -> "thrift://metastore.spark:9083",
"fs.s3.impl" -> "org.apache.hadoop.fs.s3a.S3AFileSystem",
"fs.s3a.aws.credentials.supplier" -> "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"fs.s3a.endpoint" -> "s3.amazonaws.com",
// and others...
)

def createStandaloneSessionCatalog(): (SessionCatalog, Configuration) = {
val sparkConf = new SparkConf().setAll(sessionCatalogConfig)
val hadoopConfiguration = new Configuration()
hadoopConfig.foreach {
case (key, worth) => hadoopConfiguration.set(key, worth)
}

val externalCatalog = new StandaloneHiveExternalCatalog(
sparkConf, hadoopConfiguration)
val sessionCatalog = new SessionCatalog(
new ExternalCatalogWithListener(externalCatalog)
)
(sessionCatalog, hadoopConfiguration)
}

You additionally have to create a wrapper for HiveExternalCatalog accessible in your code (as a result of the HiveExternalCatalog class is personal to the org.apache.spark package deal):

package deal org.apache.spark.sql.hive

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf

class StandaloneHiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration)
extends HiveExternalCatalog(conf, hadoopConf)

Moreover, it’s usually potential to switch code that doesn’t work on Spark Join with another, for instance:

  • sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(knowledge), schema) ==> sparkSession.createDataFrame(knowledge.toList.asJava, schema)
  • sparkSession.sparkContext.getConf.get(“some_property”) ==> sparkSession.conf.get(“some_property”)

Fallback to a separate Spark utility

Sadly, it isn’t all the time straightforward to repair a specific Spark utility to make it work as a Spark Join shopper. For instance, third-party Spark parts used within the mission pose a major threat, as they’re usually written with out contemplating compatibility with Spark Join. Since, in our surroundings, any Spark utility may be robotically launched on Spark Join, we discovered it cheap to implement a fallback to a separate Spark utility in case of failure. Simplified, the logic is as follows:

  • If some utility fails on Spark Join, we instantly attempt to rerun it as a separate Spark utility. On the similar time, we increment the counter of failures that occurred throughout execution on Spark Join (every shopper utility has its personal counter).
  • The subsequent time this utility is launched, we verify the failure counter of this utility:
    – If there are fewer than 3 failures, we assume that the final time, the appliance might have failed not due to incompatibility with Spark Join however resulting from another potential short-term causes. So, we attempt to run it on Spark Join once more. If it completes efficiently this time, the failure counter of this shopper utility is reset to zero.
    – If there are already 3 failures, we assume that the appliance can’t work on Spark Join and cease making an attempt to run it there for now. Additional, will probably be launched solely as a separate Spark utility.
  • If the appliance has 3 failures on Spark Join, however the final one was greater than 2 months in the past, we attempt to run it on Spark Join once more (in case one thing has modified in it throughout that point, making it appropriate with Spark Join). If it succeeds this time, we reset the failure counter to zero once more. If unsuccessful once more, the subsequent try shall be in one other 2 months.

This strategy is considerably easier than sustaining code that identifies the explanations for failures from logs, and it really works properly most often. Makes an attempt to run incompatible purposes on Spark Join often don’t have any vital detrimental affect as a result of, within the overwhelming majority of circumstances, if an utility is incompatible with Spark Join, it fails instantly after launch with out losing time and assets. Nonetheless, it is very important point out that every one our purposes are idempotent.

As I already talked about, we accumulate Spark statistics for every Spark utility (most of our platform optimizations and alerts rely on it). That is straightforward when the appliance runs as a separate Spark utility. Within the case of Spark Join, the phases and duties of every shopper utility must be separated from the phases and duties of all different shopper purposes that run concurrently inside the shared Spark Join server.

You may go any identifiers to the Spark Join server by setting customized properties for the shopper SparkSession:

val session = builder
.config("spark.joom.scAppId", scAppId)
.config("spark.joom.airflowTaskId", airflowTaskId)
.getOrCreate()

Then, within the SparkListener on the Spark Join server facet, you possibly can retrieve all of the handed data and affiliate every stage/activity with the actual shopper utility.

class StatsReportingSparkListener extends SparkListener {

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageId = stageSubmitted.stageInfo.stageId
val stageAttemptNumber = stageSubmitted.stageInfo.attemptNumber()
val scAppId = stageSubmitted.properties.getProperty("spark.joom.scAppId")
// ...
}
}

Right here, you’ll find the code for the StatsReportingSparkListener we use to gather statistics. You may also be all in favour of this free software for locating efficiency points in your Spark purposes.

The Spark Join server is a completely working Spark utility the place a lot of shoppers can run their Jobs. Subsequently, it may be worthwhile to customise its properties, which might make it extra dependable and stop waste of assets. Listed here are some settings that turned out to be helpful in our case:

// Utilizing dynamicAllocation is vital for the Spark Join server 
// as a result of the workload may be very inconsistently distributed over time.
spark.dynamicAllocation.enabled: true // default: false

// This pair of parameters is accountable for the well timed elimination of idle
// executors:
spark.dynamicAllocation.cachedExecutorIdleTimeout: 5m // default: infinity
spark.dynamicAllocation.shuffleTracking.timeout: 5m // default: infinity

// To create new executors solely when the prevailing ones can't deal with
// the acquired duties for a major period of time. This permits you
// to avoid wasting assets when a small variety of duties arrive in some unspecified time in the future
// in time, which don't require many executors for well timed processing.
// With elevated schedulerBacklogTimeout, pointless executors don't
// have the chance to look by the point all incoming duties are
// accomplished. The time to finish the duties will increase barely with this,
// however most often, this enhance shouldn't be vital.
spark.dynamicAllocation.schedulerBacklogTimeout: 30s // default: 1s

// If, for some purpose, you might want to cease the execution of a shopper
// utility (and release assets), you possibly can forcibly terminate the shopper.
// At the moment, even explicitly closing the shopper SparkSession doesn't
// instantly finish the execution of its corresponding Jobs on the server.
// They are going to proceed to run for a length equal to 'detachedTimeout'.
// Subsequently, it could be cheap to scale back it.
spark.join.execute.supervisor.detachedTimeout: 2m // default: 5m

// We now have encountered a state of affairs when killed duties might hold for
// an unpredictable period of time, resulting in dangerous penalties for his or her
// executors. On this case, it's higher to take away the executor on which
// this downside occurred.
spark.activity.reaper.enabled: true // default: false
spark.activity.reaper.killTimeout: 300s // default: -1

// The Spark Join server can run for an prolonged time period. Throughout
// this time, executors might fail, together with for causes past our management
// (e.g., AWS Spot interruptions). This feature is required to forestall
// all the server from failing in such circumstances.
spark.executor.maxNumFailures: 1000

// In our expertise, BroadcastJoin can result in very severe efficiency
// points in some circumstances. So, we determined to disable broadcasting.
// Disabling this feature often doesn't end in a noticeable efficiency
// degradation for our typical purposes anyway.
spark.sql.autoBroadcastJoinThreshold: -1 // default: 10MB

// For a lot of of our shopper purposes, we now have so as to add an artifact to
// the shopper session (methodology sparkSession.addArtifact()).
// Utilizing 'useFetchCache=true' ends in double house consumption for
// the appliance JAR recordsdata on executors' disks, as they're additionally duplicated
// in an area cache folder. Typically, this even causes disk overflow with
// subsequent issues for the executor.
spark.recordsdata.useFetchCache: false // default: true

// To make sure truthful useful resource allocation when a number of purposes are
// working concurrently.
spark.scheduler.mode: FAIR // default: FIFO

For instance, after we adjusted the idle timeout properties, the useful resource utilization modified as follows:

Picture by creator

Preventive restart

In our surroundings, the Spark Join server (model 3.5) might turn out to be unstable after just a few days of steady operation. Most frequently, we face randomly hanging shopper utility jobs for an infinite period of time, however there could also be different issues as properly. Additionally, over time, the likelihood of a random failure of all the Spark Join server will increase dramatically, and this could occur on the incorrect second.

As this element evolves, it can doubtless turn out to be extra steady (or we are going to discover out that we now have completed one thing incorrect in our Spark Join setup). However at the moment, the only answer has turned out to be a each day preventive restart of the Spark Join server at an appropriate second (i.e., when no shopper purposes are working on it). An instance of what the restart code may appear like may be discovered right here.

On this article, I described our expertise utilizing Spark Hook up with run a lot of various Spark purposes.

To summarize the above:

  • This element may help save assets and scale back the wait time for the execution of Spark shopper purposes.
  • It’s higher to watch out about which purposes ought to be run on the shared Spark Join server, as resource-intensive purposes might trigger issues for all the system.
  • You may create an infrastructure for launching shopper purposes in order that the choice on the right way to run any utility (both as a separate Spark utility or as a Spark Join shopper) may be made robotically for the time being of launch.
  • You will need to word that not all purposes will be capable to run on Spark Join, however the variety of such circumstances may be considerably decreased. If there’s a risk of working purposes that haven’t been examined for compatibility with the Spark Join model of SparkSession API, it’s price implementing a fallback to separate Spark purposes.
  • It’s price taking note of the Spark properties that may enhance useful resource utilization and enhance the general stability of the Spark Join server. It could even be cheap to arrange a periodic preventive restart of the Spark Join server to scale back the likelihood of unintended failure and undesirable habits.

Total, we now have had a constructive expertise utilizing Spark Join in our firm. We’ll proceed to look at the event of this expertise with nice curiosity, and there’s a plan to broaden its use.