Cafe lights. (source: Pixabay) While Spark ML pipelines have a wide variety of algorithms, you may find yourself wanting additional functionality without having to leave the pipeline model. In Spark MLlib, this isn’t much of a problem—you can manually implement your algorithm with RDD transformations and keep going from there. For Spark ML pipelines, the same approach can work, but we lose some of the nicely integrated properties of the pipeline, including the ability to automatically run meta-algorithms, such as cross-validation parameter search. In this article, you will learn how to extend the Spark ML pipeline model using the standard wordcount example as a starting point (one can never really escape the intro to big data wordcount example).
To add your own algorithm to a Spark pipeline, you need to implement either Estimator or Transformer, which implements the PipelineStage interface. For algorithms that don’t require training, you can implement the Transformer interface, and for algorithms with training you can implement the Estimator interface—both in org.apache.spark.ml (both of which implement the base PipelineStage). Note that training is not limited to complicated machine learning models; even the MinMaxScaler requires training to determine the range. If they need training, they must be constructed as Estimator rather than Transformer.
Note
Using PipelineStage directly does not work, since inside of the pipeline fitting reflection is used, which assumes all stages are either an Estimator or a Transformer.
In addition to the obvious transform or fit function, all pipeline stages need to provide transformSchema, and a copy constructor or implement a class, which provides these for you—copy is used to make a copy of the current stage, with any newly specified params merged in, and can simply be called defaultCopy (unless your class has special constructor considerations).
The start of a pipeline stage, as well as the copy delegation, is shown—transformSchema must produce what the expected output of your pipeline stage is based on any parameters set and an input schema. Most pipeline stages simply add new fields; very few drop previous fields in case they are needed, but this can sometimes result in records containing more data than is required downstream, negatively impacting performance. If you find this is a problem in your pipeline, you can create your own stage to drop unnecessary fields.
classHardCodedWordCountStage(overridevaluid:String)extendsTransformer{defthis()=this(Identifiable.randomUID("hardcodedwordcount"))defcopy(extra:ParamMap):HardCodedWordCountStage={defaultCopy(extra)}
In addition to producing the output schema, the transformSchema function should validate that the input schema is suitable for the stage (e.g., the input column is of the expected type).
This is also where you should perform validation on your stages parameters.
A simple transformSchema for string inputs and a vector output, with hard coded input and output columns, is illustrated as follows.
overridedeftransformSchema(schema:StructType):StructType={// Check that the input type is a stringvalidx=schema.fieldIndex("happy_pandas")valfield=schema.fields(idx)if(field.dataType!=StringType){thrownewException(s"Input type${field.dataType}did not match input type StringType")}// Add the return fieldschema.add(StructField("happy_panda_counts",IntegerType,false))}
Algorithms that do not require training can be implemented very simply using the Transformer interface. Since this is the simplest pipeline stage, you can start with implementing a simple transformer, which counts the number of words on the input column.
deftransform(df:Dataset[_]):DataFrame={valwordcount=udf{in:String=>in.split(" ").size}df.select(col("*"),wordcount(df.col("happy_pandas")).as("happy_panda_counts"))}
To get the most of the pipeline interface, you will want to make your pipeline stage configurable using the params interface.
While the params interface is public, sadly the common default params that are commonly used inside of Spark are private, so you will end up with some amount of code duplication. In addition to allowing users to specify values, parameters can also contain some basic validation logic (e.g., the regularization parameter must be set to a non-negative value). The two most common parameters are input column and output column, which you can add to your model relatively simply.
In addition to string params, any other type can be used, including lists of strings for things like stop words, and doubles for things like stop words.
classConfigurableWordCount(overridevaluid:String)extendsTransformer{finalvalinputCol=newParam[String](this,"inputCol","The input column")finalvaloutputCol=newParam[String](this,"outputCol","The output column");defsetInputCol(value:String):this.type=set(inputCol,value)defsetOutputCol(value:String):this.type=set(outputCol,value)defthis()=this(Identifiable.randomUID("configurablewordcount"))defcopy(extra:ParamMap):HardCodedWordCountStage={defaultCopy(extra)}overridedeftransformSchema(schema:StructType):StructType={// Check that the input type is a stringvalidx=schema.fieldIndex($(inputCol))valfield=schema.fields(idx)if(field.dataType!=StringType){thrownewException(s"Input type${field.dataType}did not match input type StringType")}// Add the return fieldschema.add(StructField($(outputCol),IntegerType,false))}deftransform(df:Dataset[_]):DataFrame={valwordcount=udf{in:String=>in.split(" ").size}df.select(col("*"),wordcount(df.col($(inputCol))).as($(outputCol)))}}
Algorithms that do require training can be implemented using the Estimator interface—although, for many algorithms, the org.apache.spark.ml.Predictor or org.apache.spark.ml.classificationClassifier helper classes are easier to implement. The primary difference between the Estimator and Transformer interfaces is that rather than directly expressing your transformation on the input, you will first have a training step in the form of a train function. A string indexer is one of the simplest estimators you can implement, and while it’s already available in Spark, is still a good illustration of how to use the estimator interface.
traitSimpleIndexerParamsextendsParams{finalvalinputCol=newParam[String](this,"inputCol","The input column")finalvaloutputCol=newParam[String](this,"outputCol","The output column")}classSimpleIndexer(overridevaluid:String)extendsEstimator[SimpleIndexerModel]withSimpleIndexerParams{defsetInputCol(value:String)=set(inputCol,value)defsetOutputCol(value:String)=set(outputCol,value)defthis()=this(Identifiable.randomUID("simpleindexer"))overridedefcopy(extra:ParamMap):SimpleIndexer={defaultCopy(extra)}overridedeftransformSchema(schema:StructType):StructType={// Check that the input type is a stringvalidx=schema.fieldIndex($(inputCol))valfield=schema.fields(idx)if(field.dataType!=StringType){thrownewException(s"Input type${field.dataType}did not match input type StringType")}// Add the return fieldschema.add(StructField($(outputCol),IntegerType,false))}overridedeffit(dataset:Dataset[_]):SimpleIndexerModel={importdataset.sparkSession.implicits._valwords=dataset.select(dataset($(inputCol)).as[String]).distinct.collect()newSimpleIndexerModel(uid,words);}}classSimpleIndexerModel(overridevaluid:String,words:Array[String])extendsModel[SimpleIndexerModel]withSimpleIndexerParams{overridedefcopy(extra:ParamMap):SimpleIndexerModel={defaultCopy(extra)}privatevallabelToIndex:Map[String,Double]=words.zipWithIndex.map{case(x,y)=>(x,y.toDouble)}.toMapoverridedeftransformSchema(schema:StructType):StructType={// Check that the input type is a stringvalidx=schema.fieldIndex($(inputCol))valfield=schema.fields(idx)if(field.dataType!=StringType){thrownewException(s"Input type${field.dataType}did not match input type StringType")}// Add the return fieldschema.add(StructField($(outputCol),IntegerType,false))}overridedeftransform(dataset:Dataset[_]):DataFrame={valindexer=udf{label:String=>labelToIndex(label)}dataset.select(col("*"),indexer(dataset($(inputCol)).cast(StringType)).as($(outputCol)))}}
If you are implementing an iterative algorithm, you may wish to consider caching the input data automatically if it’s not already cached, or allow the user to specify a persistence level.
The Predictor interface adds the two most common parameters (input and output columns) as labels column, features column, and prediction column—and automatically handles the schema transformation for us.
The Classifier interface does much the same, except it also adds a rawPredictionColumn and provides tools to detect the number of classes (getNumClasses) as well as convert the input DataFrame to an RDD of LabeledPoints (making it easier to wrap legacy MLlib classification algorithms).
If you are implementing a regression or clustering interface, there is no public base set of interfaces to use, so you will need to use the generic Estimator interface.
// Simple Bernouli Naive Bayes classifier - no sanity checks for brevity// Example only - not for production use.classSimpleNaiveBayes(valuid:String)extendsClassifier[Vector,SimpleNaiveBayes,SimpleNaiveBayesModel]{defthis()=this(Identifiable.randomUID("simple-naive-bayes"))overridedeftrain(ds:Dataset[_]):SimpleNaiveBayesModel={importds.sparkSession.implicits._ds.cache()// Note: you can use getNumClasses and extractLabeledPoints to get an RDD instead// Using the RDD approach is common when integrating with legacy machine learning code// or iterative algorithms which can create large query plans.// Here we use Datasets since neither of those apply.// Compute the number of documentsvalnumDocs=ds.count// Get the number of classes.// Note this estimator assumes they start at 0 and go to numClassesvalnumClasses=getNumClasses(ds)// Get the number of features by peaking at the first rowvalnumFeatures:Integer=ds.select(col($(featuresCol))).head.get(0).asInstanceOf[Vector].size// Determine the number of records for each classvalgroupedByLabel=ds.select(col($(labelCol)).as[Double]).groupByKey(x=>x)valclassCounts=groupedByLabel.agg(count("*").as[Long]).sort(col("value")).collect().toMap// Select the labels and features so we can more easily map over them.// Note: we do this as a DataFrame using the untyped API because the Vector// UDT is no longer public.valdf=ds.select(col($(labelCol)).cast(DoubleType),col($(featuresCol)))// Figure out the non-zero frequency of each feature for each label and// output label index pairs using a case clas to make it easier to work with.vallabelCounts:Dataset[LabeledToken]=df.flatMap{caseRow(label:Double,features:Vector)=>features.toArray.zip(Streamfrom1).filter{vIdx=>vIdx._2==1.0}.map{case(v,idx)=>LabeledToken(label,idx)}}// Use the typed Dataset aggregation API to count the number of non-zero// features for each label-feature index.valaggregatedCounts:Array[((Double,Integer),Long)]=labelCounts.groupByKey(x=>(x.label,x.index)).agg(count("*").as[Long]).collect()valtheta=Array.fill(numClasses)(newArray[Double](numFeatures))// Compute the denominator for the general prioirsvalpiLogDenom=math.log(numDocs+numClasses)// Compute the priors for each classvalpi=classCounts.map{case(_,cc)=>math.log(cc.toDouble)-piLogDenom}.toArray// For each label/feature update the probabilitiesaggregatedCounts.foreach{case((label,featureIndex),count)=>// log of number of documents for this label + 2.0 (smoothing)valthetaLogDenom=math.log(classCounts.get(label).map(_.toDouble).getOrElse(0.0)+2.0)theta(label.toInt)(featureIndex)=math.log(count+1.0)-thetaLogDenom}// Unpersist now that we are done computing everythingds.unpersist()// Construct a modelnewSimpleNaiveBayesModel(uid,numClasses,numFeatures,Vectors.dense(pi),newDenseMatrix(numClasses,theta(0).length,theta.flatten,true))}overridedefcopy(extra:ParamMap)={defaultCopy(extra)}}// Simplified Naive Bayes ModelcaseclassSimpleNaiveBayesModel(overridevaluid:String,overridevalnumClasses:Int,overridevalnumFeatures:Int,valpi:Vector,valtheta:DenseMatrix)extendsClassificationModel[Vector,SimpleNaiveBayesModel]{overridedefcopy(extra:ParamMap)={defaultCopy(extra)}// We have to do some tricks here because we are using Spark's// Vector/DenseMatrix calculations - but for your own model don't feel// limited to Spark's native ones.valnegThetaArray=theta.values.map(v=>math.log(1.0-math.exp(v)))valnegTheta=newDenseMatrix(numClasses,numFeatures,negThetaArray,true)valthetaMinusNegThetaArray=theta.values.zip(negThetaArray).map{case(v,nv)=>v-nv}valthetaMinusNegTheta=newDenseMatrix(numClasses,numFeatures,thetaMinusNegThetaArray,true)valonesVec=Vectors.dense(Array.fill(theta.numCols)(1.0))valnegThetaSum:Array[Double]=negTheta.multiply(onesVec).toArray// Here is the prediciton functionality you need to implement - for ClassificationModels// transform automatically wraps this - but if you might benefit from broadcasting your model or// other optimizations you can also override transform.defpredictRaw(features:Vector):Vector={// Toy implementation - use BLAS or similar instead// the summing of the three vectors but the functionality isn't exposed.Vectors.dense(thetaMinusNegTheta.multiply(features).toArray.zip(pi.toArray).map{case(x,y)=>x+y}.zip(negThetaSum).map{case(x,y)=>x+y})}}
Note
If you simply need to modify an existing algorithm, you can (by pretending to be in the org.apache.spark project) extend it.
Now you know how to extend Spark’s ML Pipeline API with your own stages. If you get lost, a good reference is the algorithms inside of Spark’s itself—while they do sometimes use internal APIs, for the most part they implement public interfaces in the same way that you will want to.
