Skip to content

Commit

Permalink
Initial commit of spark benchmark framework (GoogleCloudPlatform#995)
Browse files Browse the repository at this point in the history
  • Loading branch information
hildrum committed Jun 3, 2016
1 parent 2d72fd8 commit 254ede1
Show file tree
Hide file tree
Showing 16 changed files with 580 additions and 26 deletions.
27 changes: 26 additions & 1 deletion perfkitbenchmarker/benchmark_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from perfkitbenchmarker import os_types
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import providers
from perfkitbenchmarker import spark_service
from perfkitbenchmarker import static_virtual_machine as static_vm
from perfkitbenchmarker import virtual_machine
from perfkitbenchmarker import vm_util
Expand Down Expand Up @@ -86,6 +87,7 @@ def __init__(self, benchmark_config, benchmark_name, benchmark_uid):
benchmark_name: string. Name of the benchmark.
benchmark_uid: An identifier unique to this run of the benchmark even
if the same benchmark is run multiple times with different configs.
spark_service: The spark service configured for this benchmark.
"""
self.config = benchmark_config
self.name = benchmark_name
Expand All @@ -100,6 +102,7 @@ def __init__(self, benchmark_config, benchmark_name, benchmark_uid):
self.file_name = os.path.join(vm_util.GetTempDir(), self.uid)
self.uuid = str(uuid.uuid4())
self.always_call_cleanup = False
self.spark_service = None

# Set the current thread's BenchmarkSpec object to this one.
context.SetThreadBenchmarkSpec(self)
Expand Down Expand Up @@ -228,6 +231,24 @@ def ConstructVirtualMachines(self):
self.vm_groups[group_name] = vms
self.vms.extend(vms)

def ConstructSparkService(self):
if self.config.spark_service is None:
return

providers.LoadProvider(self.config.spark_service.cloud)
spark_service_spec = self.config.spark_service
service_type = spark_service_spec.spark_service_type
spark_service_class = spark_service.GetSparkServiceClass(
spark_service_spec.cloud, service_type)
if self.config.spark_service.static_cluster_name:
name = self.config.spark_service.static_cluster_name
static_cluster = True
else:
name = 'pkb-' + FLAGS.run_uri
static_cluster = False
self.spark_service = spark_service_class(name, static_cluster,
spark_service_spec)

def Prepare(self):
targets = [(vm.PrepareBackgroundWorkload, (), {}) for vm in self.vms]
vm_util.RunParallelThreads(targets, len(targets))
Expand All @@ -254,12 +275,16 @@ def Provision(self):
sshable_vm_groups[group_name] = [vm for vm in group_vms
if vm.OS_TYPE != os_types.WINDOWS]
vm_util.GenerateSSHConfig(sshable_vms, sshable_vm_groups)

if self.spark_service:
self.spark_service.Create()

def Delete(self):
if self.deleted:
return

if self.spark_service:
self.spark_service.Delete()

if self.vms:
try:
vm_util.RunThreaded(self.DeleteVm, self.vms)
Expand Down
101 changes: 99 additions & 2 deletions perfkitbenchmarker/configs/benchmark_config_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from perfkitbenchmarker import os_types
from perfkitbenchmarker import providers
from perfkitbenchmarker import static_virtual_machine
from perfkitbenchmarker import spark_service
from perfkitbenchmarker import virtual_machine
from perfkitbenchmarker.configs import option_decoders
from perfkitbenchmarker.configs import spec
Expand Down Expand Up @@ -163,6 +164,73 @@ def __init__(self, **kwargs):
default=list, item_decoder=_StaticVmDecoder(), **kwargs)


class _SparkServiceSpec(spec.BaseSpec):
"""Configurable options of an Apache Spark Service.
We may add more options here, such as disk specs, as necessary.
Attributes:
spark_service_type: string. pkb_managed or managed_service
num_workers: number of workers.
machine_type: machine type to use.
cloud: cloud to use.
project: project to use.
"""

def __init__(self, component_full_name, flag_values=None, **kwargs):
super(_SparkServiceSpec, self).__init__(component_full_name,
flag_values=flag_values,
**kwargs)

@classmethod
def _GetOptionDecoderConstructions(cls):
"""Gets decoder classes and constructor args for each configurable option.
Returns:
dict. Maps option name string to a (ConfigOptionDecoder class, dict) pair.
The pair specifies a decoder class and its __init__() keyword arguments
to construct in order to decode the named option.
"""
result = super(_SparkServiceSpec, cls)._GetOptionDecoderConstructions()
result.update({
'static_cluster_name': (option_decoders.StringDecoder,
{'default': None, 'none_ok': True}),
'spark_service_type': (option_decoders.EnumDecoder, {
'default': spark_service.PROVIDER_MANAGED,
'valid_values': [spark_service.PROVIDER_MANAGED,
spark_service.PKB_MANAGED]}),
'num_workers': (option_decoders.IntDecoder, {
'default': 3, 'min': 1}),
'cloud': (option_decoders.EnumDecoder, {
'valid_values': providers.VALID_CLOUDS}),
'project': (option_decoders.StringDecoder, {'default': None,
'none_ok': True}),
'machine_type': (option_decoders.StringDecoder, {'default': None,
'none_ok': True})})
return result

@classmethod
def _ApplyFlags(cls, config_values, flag_values):
"""Modifies config options based on runtime flag values.
Can be overridden by derived classes to add support for specific flags.
Args:
config_values: dict mapping config option names to provided values. May
be modified by this function.
flag_values: flags.FlagValues. Runtime flags that may override the
provided config values.
"""
super(_SparkServiceSpec, cls)._ApplyFlags(config_values, flag_values)
if flag_values['cloud'].present or 'cloud' not in config_values:
config_values['cloud'] = flag_values.cloud
if flag_values['project'].present or 'project' not in config_values:
config_values['project'] = flag_values.project
if flag_values['spark_static_cluster_name'].present:
config_values['static_cluster_name'] = (
flag_values.spark_static_cluster_name)


class _VmGroupSpec(spec.BaseSpec):
"""Configurable options of a VM group.
Expand Down Expand Up @@ -286,6 +354,33 @@ def Decode(self, value, component_full_name, flag_values):
return result


class _SparkServiceDecoder(option_decoders.TypeVerifier):
"""Validates the spark_service dictionary of a benchmark config object."""
def __init__(self, **kwargs):
super(_SparkServiceDecoder, self).__init__(valid_types=(dict,), **kwargs)

def Decode(self, value, component_full_name, flag_values):
"""Verifies spark_service dictionary of a benchmark config object.
Args:
value: dict Spark Service config dictionary
component_full_name: string. Fully qualified name of the configurable
component containing the config option.
flag_values: flags.FlagValues. Runtime flag values to be propagated to
BaseSpec constructors.
Returns:
_SparkServiceSpec Build from the config passed in in value.
Raises:
errors.Config.InvalidateValue upon invalid input value.
"""
spark_service_config = super(_SparkServiceDecoder, self).Decode(
value, component_full_name, flag_values)
result = _SparkServiceSpec(self._GetOptionFullName(component_full_name),
flag_values, **spark_service_config)
return result



class BenchmarkConfigSpec(spec.BaseSpec):
"""Configurable options of a benchmark run.
Expand Down Expand Up @@ -316,7 +411,8 @@ def __init__(self, component_full_name, expected_os_types=None, **kwargs):
for group_name, group_spec in sorted(self.vm_groups.iteritems()):
if group_spec.os_type not in expected_os_types:
mismatched_os_types.append('{0}.vm_groups[{1}].os_type: {2}'.format(
component_full_name, repr(group_name), repr(group_spec.os_type)))
component_full_name, repr(group_name),
repr(group_spec.os_type)))
if mismatched_os_types:
raise errors.Config.InvalidValue(
'VM groups in {0} may only have the following OS types: {1}. The '
Expand All @@ -341,7 +437,8 @@ def _GetOptionDecoderConstructions(cls):
result.update({
'description': (option_decoders.StringDecoder, {'default': None}),
'flags': (_FlagsDecoder, {}),
'vm_groups': (_VmGroupsDecoder, {})})
'vm_groups': (_VmGroupsDecoder, {'default': {}}),
'spark_service': (_SparkServiceDecoder, {'default': None})})
return result

def _DecodeAndInit(self, component_full_name, config, decoders, flag_values):
Expand Down
2 changes: 1 addition & 1 deletion perfkitbenchmarker/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class UnrecognizedOption(Error):


class Juju(object):
"""Errors related to the Juju OS_TYPE"""
"""Errors related to the Juju OS_TYPE."""
class TimeoutException(Error):
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@


def GetConfig(user_config):
""" Reads the config file and overwrites vm_count with num_vms"""
"""Reads the config file and overwrites vm_count with num_vms."""

config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
if FLAGS['num_vms'].present:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class DBStatusQueryError(Exception):


def _GenerateRandomPassword():
""" Generates a random password to be used by the DB instance.
"""Generates a random password to be used by the DB instance.
Args:
None
Returns:
Expand Down Expand Up @@ -285,7 +285,7 @@ def _GetSysbenchCommandPrefix(os_type):


def _IssueSysbenchCommand(vm, duration):
""" Issues a sysbench run command given a vm and a duration.
"""Issues a sysbench run command given a vm and a duration.
Does nothing if duration is <= 0
Expand Down Expand Up @@ -330,7 +330,7 @@ def _IssueSysbenchCommand(vm, duration):


def _RunSysbench(vm, metadata):
""" Runs the Sysbench OLTP test.
"""Runs the Sysbench OLTP test.
The test is run on the DB instance as indicated by the vm.db_instance_address.
Expand Down Expand Up @@ -678,7 +678,7 @@ class GoogleCloudSQLBenchmark(object):
"""MySQL benchmark based on the Google Cloud SQL service."""

def Prepare(self, vm):
"""Prepares the DB and everything for the provider GCP (Cloud SQL)
"""Prepares the DB and everything for the provider GCP (Cloud SQL).
Args:
vm: The VM to be used as the test client
Expand Down
104 changes: 104 additions & 0 deletions perfkitbenchmarker/linux_benchmarks/spark_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a jar using a cluster that supports Apache Spark.
This benchmark takes a jarfile and class name, and runs that class
using an Apache Spark cluster. The Apache Spark cluster can be one
supplied by a cloud provider, such as Google's Dataproc.
By default, it runs SparkPi.
It records how long the job takes to run.
For more on Apache Spark, see: http://spark.apache.org/
"""

import datetime
import logging

from perfkitbenchmarker import configs
from perfkitbenchmarker import sample
from perfkitbenchmarker import flags



BENCHMARK_NAME = 'spark'
BENCHMARK_CONFIG = """
spark:
description: Run a jar on a spark cluster.
spark_service:
spark_service_type: managed
num_workers: 4
"""

# This points to a file on the spark cluster.
DEFAULT_JARFILE = 'file:///usr/lib/spark/lib/spark-examples.jar'
DEFAULT_CLASSNAME = 'org.apache.spark.examples.SparkPi'

flags.DEFINE_string('spark_jarfile', DEFAULT_JARFILE,
'Jarfile to submit.')
flags.DEFINE_string('spark_classname', DEFAULT_CLASSNAME,
'Classname to be used')
flags.DEFINE_string('spark_static_cluster_name', None,
'If set, the name of the Spark cluster, assumed to be '
'ready.')

FLAGS = flags.FLAGS


def GetConfig(user_config):
return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)


def Prepare(benchmark_spec):
pass


def Run(benchmark_spec):
"""Executes the given jar on the specified Spark cluster.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
A list of sample.Sample objects.
"""
spark_cluster = benchmark_spec.spark_service
jar_start = datetime.datetime.now()
stdout, stderr, retcode = spark_cluster.SubmitJob(FLAGS.spark_jarfile,
FLAGS.spark_classname)
logging.info('Jar result is ' + stdout)
jar_end = datetime.datetime.now()

metadata = spark_cluster.GetMetadata().update(
{'jarfile': FLAGS.spark_jarfile, 'class': FLAGS.spark_classname})

results = []
results.append(sample.Sample('jar_time',
(jar_end - jar_start).total_seconds(),
'seconds', metadata))

if not spark_cluster.user_managed:
create_time = (spark_cluster.create_end_time -
spark_cluster.create_start_time)
results.append(sample.Sample('cluster_create_time', create_time, 'seconds',
metadata))

return results


def Cleanup(benchmark_spec):
pass
2 changes: 1 addition & 1 deletion perfkitbenchmarker/linux_virtual_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ def BurnCpu(self, burn_cpu_threads=None, burn_cpu_seconds=None):
self.RemoteCommand('pkill -9 sysbench')

def PrepareBackgroundWorkload(self):
"""Install packages needed for the background workload """
"""Install packages needed for the background workload."""
if self.background_cpu_threads:
self.Install('sysbench')
if self.background_network_mbits_per_sec:
Expand Down
Loading

0 comments on commit 254ede1

Please sign in to comment.