Fractal piping
Fractal piping (source: Pixabay)

In part one of this blog series, we introduced and then trained models for tokenization and part-of-speech tagging using two libraries—John Snow Labs’ NLP for Apache Spark and Explosion AI’s spaCy. In this part, we'll go over building and running a natural language processing (NLP) pipeline that applies these models to new free-text data.

Plugging in our data is a challenging step since our data is made of unformatted, non-sentence-bounded text that is raw and heterogeneous. I am working with a folder full of .txt files, and need to save the results in word-tag format, by filename so I can compare it to the correct answers later. Let’s work it out:


start = time.time()
path = "./target/testing/"
files = sorted([path + f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])

prediction = {}
for file in files:
    fo =, mode='r', encoding='utf-8')
    content = []
    for doc in nlp_model(re.sub("\\s+", ' ',
        content.append((doc.text, doc.tag_))
    prediction[file] = content
print (time.time() - start)

Another way of computing this, in parallel, would be using generators and spaCy’s language pipe. Something like this could work out:


from spacy.language import Language
import itertools

def genf():
    path = "./target/testing/"
    files = sorted([path + f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])
    for file in files:
        fo =, mode='r', encoding='utf-8')
        t = re.sub("\\s+", ' ',
        yield (file, t)
gen1, gen2 = itertools.tee(genf())
files = (file for (file, text) in gen1)
texts = (text for (file, text) in gen2)

start = time.time()
prediction = {}
for file, doc in zip(files, nlp_model.pipe(texts, batch_size=10, n_threads=12)):
    content = []
    for d in doc:
        content.append((d.text, d.tag_))
    prediction[file] = content
print (time.time() - start)


var data ="./target/testing").as[String]
    .withColumnRenamed("value", "text")
    .withColumn("filename", regexp_replace(input_file_name(), "file://", ""))

data = data.groupBy("filename").agg(concat_ws(" ", collect_list(data("text"))).as("text"))

val files ="filename")[String].collect

val result = model.transform(data)

val prediction = Benchmark.time("Time to collect predictions") {
        .select("finished_token", "finished_pos", "filename").as[(Array[String], Array[String], String)]
        .map(wt => (wt._3,

As in Apache Spark, I can read a folder of text files with just textFile(), although it reads line by line. I need to identify the filenames per line, so I can combine them back again. Luckily for me, input_file_name() does exactly that. I then proceed to group by and concatenate the lines with a whitespace.

Note that neither of the two code snippets above use code that is unique to the NLP libraries. spaCy relies on Python’s file operations, and Spark-NLP relies on Spark’s native data set loading and processing primitives. Note that the Spark code above will work just the same on an input file of 10 kilobytes, 10 gigabytes, or 10 terabytes, if the Spark cluster is correctly configured. Also, your learning curve for each library depends on your familiarity with its library ecosystem.

Measuring the results

If what we did so far seems hard, then this one is super difficult. How can we measure the POS accuracy when the answers we have are tokenized differently? I had to pull off some magic here, and I am sure it will be controversial. There was no easy way to objectively compute the results and match them fairly, so here is what I came up with.


First, I need to process the answers folder, which is a folder full of .txt files that look exactly like the training data, with the same filenames of the testing data we used in the previous step.

answers = {}
total_words = 0
for file in result_files:
    fo =, mode='r', encoding='utf-8')
    file_tags = []
    for pair in re.split("\\s+", re.sub("\\s+", ' ',
        total_words += 1
        tag = pair.strip().split("|")
        file_tags.append((tag[0], tag[-1]))
    answers[file] = file_tags


For this one, I pulled off the same function that parses the tuples of the POS annotator. It belongs to a helper object called ResourceHelper, which has a lot of similar functions to help with data.

var total_words = 0
val answer ="testing", "answer")).map(file => {
    val content = ResourceHelper
        .parseTupleSentences(file, "TXT", '|', 500).flatMap(_.tupleWords)
    total_words += content.length
    (file, content)

And here is where the magic happens. I have the answers, which both in spaCy and in Spark-NLP look like a dictionary of (filename, array((word, tag)). And this is the same format we have in the prediction step.

So, for this, I have created a small algorithm in which I run through every pair of prediction and answer, and compare the words in it. For every matched word, I count a matched token, and for every matched tag on them, I count for a success.

But, since words are tokenized differently than in the ANC results, I need to open up a small window into which a word is given a chance to match a specific number of rolling tokens in ANC, or else let me know where to continue my search in the file.

If the predicted word is sub-tokenized (fewer tokens than in ANC), then the word will never match and is ignored—e.g., two-legged|JJ where in ANC it's two|NN – legged|JJ, then it will collect ANC tokens until it confirms it’s sub-tokenized and ignore it (construct is the collection of sub-tokens).

Then, the index is placed at the latest match, so, if the predicted word appears later (due to the previous word being sub-tokenized), it will eventually find it in range, and count it.

Here’s how it looks in code:


start = time.time()
word_matches = 0
tag_matches = 0

for file in list(prediction.keys()):
    last_match = 0
    print("analyzing: " + file)
    for pword, ptag in answers[file.replace('testing', 'answer')]:
        print("target word is: " + pword)
        last_attempt = 0
        construct = ''
        for word, tag in prediction[file][last_match:]:
            if word.strip() == '':
                last_match += 1
            construct += word
            print("against: " + word + " or completion of construct " + pword)
            last_attempt += 1
            if pword == word:
                print("word found: " + word)
                if ptag == tag:
                    print("match found: " + word + " as " + tag)
                    tag_matches += 1
                word_matches += 1
                last_match += last_attempt
            elif pword == construct:
                print(pword + " construct complete. No point in keeping the search")
                last_match += last_attempt
            elif len(pword) <= len(construct):
                print(pword + " construct larger than target. No point in keeping the search")
                if (pword in construct):
                    last_match += last_attempt
print (time.time() - start)


analyzing: ./target/testing/20000424_nyt-NEW.txt
target word is: IRAQ-POVERTY
against: IRAQ-POVERTY or completion of construct IRAQ-POVERTY
word found: IRAQ-POVERTY
target word is: (
against: ( or completion of construct (
word found: (
match found: ( as (
target word is: Washington
against: Washington or completion of construct Washington
word found: Washington
match found: Washington as NNP
target word is: )
against: ) or completion of construct )
word found: )
match found: ) as )
target word is: Rep.
against: Rep or completion of construct Rep.
against: . or completion of construct Rep.
Rep. construct complete. No point in keeping the search

As we can see, it will give tokens a chance to find a mate; this algorithm basically keeps tokens synchronized, nothing more.

print("Total words: " + str(total_words))
print("Total token matches: " + str(word_matches))
print("Total tag matches: " + str(tag_matches))
print("Simple Accuracy: " + str(tag_matches / word_matches))


Total words: 21381
Total token matches: 20491
Total tag matches: 17056
Simple Accuracy: 0.832365428724806


We see similar behavior in Spark-NLP, perhaps just a little bit more of mixed imperative and functional programming:

var misses = 0
var token_matches = 0
var tag_matches = 0
for (file <- prediction.keys) {
    var current = 0
    for ((pword, ptag) <- prediction(file)) {
        println(s"analyzing: ${pword}")
        var found = false
        val tags = answer(file.replace("testing", "answer"))
        var construct = ""
        var attempt = 0
        tags.takeRight(tags.length - current).iterator.takeWhile(_ => !found && construct != pword).foreach { case (word, tag) => {
            construct += word
            println(s"against: $word or if matches construct $construct")
            if (word == pword) {
                println(s"word match: $pword")
                token_matches += 1
                if (tag == ptag) {
                    println(s"tag match: $tag")
                    tag_matches += 1
                found = true
            else if (pword.length < construct.length) {
                if (attempt > 0) {
                    println(s"construct $construct too long for word $pword against $word")
                    attempt -= attempt
                    misses += 1
                    println(s"failed match our $pword against their $word or $construct")
                    found = true
            attempt += 1
        current += attempt


analyzing: NYT20020731.0371
against: NYT20020731.0371 or if matches construct NYT20020731.0371
word match: NYT20020731.0371
analyzing: 2002-07-31
against: 2002-07-31 or if matches construct 2002-07-31
word match: 2002-07-31
analyzing: 23:38
against: 23:38 or if matches construct 23:38
word match: 23:38
analyzing: A4917
against: A4917 or if matches construct A4917
word match: A4917
analyzing: &
against: & or if matches construct &
word match: &
tag match: CC

We then compute the measures:

println("Total words: " + (total_words))
println("Total token matches: " + (token_matches))
println("Total tag matches: " + (tag_matches))
println("Simple Accuracy: " + (tag_matches * 1.0 / token_matches))


Total words: 21362
Total token matches: 18201
Total tag matches: 15318
Simple Accuracy: 0.8416021097741883

Spark-NLP, but where’s Spark?

I haven’t made many notes regarding where Spark fits in all this. But, without boring you too much further, I’ll put everything in easy-to-read bullets:

  • You guessed it! This is an overkill for Spark. You don’t need four hammers for a single small nail. With this I mean, Spark works in distributed fashion by default, and defaults are meant for quite big data (e.g., spark.sql.shuffle.partitions is 200). In this example, I’ve made the attempt to control the data I work with, and not expand too much. “Big data” can become an enemy, memory issues, bad parallelism or a slow algorithm might slap you back.
  • You can easily plug HDFS into this same exact pipeline. Increase the number of files or size, and the process will be able to take it. You can even take advantage of proper partitioning and putting data in memory.
  • It is equally possible to plug this algorithm into a distributed cluster, submit it to wherever the driver is and the workload is automatically distributed for you.
  • This specific NLP program shown here, is not really good for scalable solutions. Basically, collecting with collect() all words and tags down to the driver will blast your driver memory if you apply this into a few hundred megabytes of text files. Measuring performance in such a scenario will require proper map reduce tasks to count the number of POS matches. This explains why it takes longer in Spark-NLP than in spaCy to bring back a small number of predictions to the driver, but this will revert for larger inputs.

What’s next?

In this post, we compare the work for running and evaluating our benchmark NLP pipeline on both libraries. In general, your personal preference or experience may tilt you toward preferring the core Python libraries and imperative programming style with spaCy, or core Spark and functional programming style with Spark-NLP.

For the small data set we tested on here, runtime was below one second on both libraries, and accuracy was comparable. In part three of the blog series, we’ll evaluate on additional data sizes and parameters.

Article image: Fractal piping (source: Pixabay).