Comparing production-grade NLP libraries: Running Spark-NLP and spaCy pipelines
A step-by-step guide to building and running a natural language processing pipeline.
In part one of this blog series, we introduced and then trained models for tokenization and part-of-speech tagging using two libraries—John Snow Labs’ NLP for Apache Spark and Explosion AI’s spaCy. In this part, we’ll go over building and running a natural language processing (NLP) pipeline that applies these models to new free-text data.
Plugging in our data is a challenging step since our data is made of unformatted, non-sentence-bounded text that is raw and heterogeneous. I am working with a folder full of .txt files, and need to save the results in word-tag format, by filename so I can compare it to the correct answers later. Let’s work it out:
spaCy
start = time.time() path = "./target/testing/" files = sorted([path + f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]) prediction = {} for file in files: fo = io.open(file, mode='r', encoding='utf-8') content = [] for doc in nlp_model(re.sub("\\s+", ' ', fo.read())): content.append((doc.text, doc.tag_)) prediction[file] = content fo.close() print (time.time() - start)
Another way of computing this, in parallel, would be using generators and spaCy’s language pipe. Something like this could work out:
spaCy
from spacy.language import Language import itertools def genf(): path = "./target/testing/" files = sorted([path + f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]) for file in files: fo = io.open(file, mode='r', encoding='utf-8') t = re.sub("\\s+", ' ', fo.read()) fo.close() yield (file, t) gen1, gen2 = itertools.tee(genf()) files = (file for (file, text) in gen1) texts = (text for (file, text) in gen2) start = time.time() prediction = {} for file, doc in zip(files, nlp_model.pipe(texts, batch_size=10, n_threads=12)): content = [] for d in doc: content.append((d.text, d.tag_)) prediction[file] = content print (time.time() - start)
Spark-NLP
var data = spark.read.textFile("./target/testing").as[String] .map(_.trim()).filter(_.nonEmpty) .withColumnRenamed("value", "text") .withColumn("filename", regexp_replace(input_file_name(), "file://", "")) data = data.groupBy("filename").agg(concat_ws(" ", collect_list(data("text"))).as("text")) .repartition(12) .cache val files = data.select("filename").distinct.as[String].collect val result = model.transform(data) val prediction = Benchmark.time("Time to collect predictions") { result .select("finished_token", "finished_pos", "filename").as[(Array[String], Array[String], String)] .map(wt => (wt._3, wt._1.zip(wt._2))) .collect.toMap }
As in Apache Spark, I can read a folder of text files with just textFile()
, although it reads line by line. I need to identify the filenames per line, so I can combine them back again. Luckily for me, input_file_name()
does exactly that. I then proceed to group by and concatenate the lines with a whitespace.
Note that neither of the two code snippets above use code that is unique to the NLP libraries. spaCy relies on Python’s file operations, and Spark-NLP relies on Spark’s native data set loading and processing primitives. Note that the Spark code above will work just the same on an input file of 10 kilobytes, 10 gigabytes, or 10 terabytes, if the Spark cluster is correctly configured. Also, your learning curve for each library depends on your familiarity with its library ecosystem.
Measuring the results
If what we did so far seems hard, then this one is super difficult. How can we measure the POS accuracy when the answers we have are tokenized differently? I had to pull off some magic here, and I am sure it will be controversial. There was no easy way to objectively compute the results and match them fairly, so here is what I came up with.
spaCy
First, I need to process the answers folder, which is a folder full of .txt files that look exactly like the training data, with the same filenames of the testing data we used in the previous step.
answers = {} total_words = 0 for file in result_files: fo = io.open(file, mode='r', encoding='utf-8') file_tags = [] for pair in re.split("\\s+", re.sub("\\s+", ' ', fo.read())): total_words += 1 tag = pair.strip().split("|") file_tags.append((tag[0], tag[-1])) answers[file] = file_tags fo.close() print(total_words)
Spark-NLP
For this one, I pulled off the same function that parses the tuples of the POS annotator. It belongs to a helper object called ResourceHelper
, which has a lot of similar functions to help with data.
var total_words = 0 val answer = files.map(_.replace("testing", "answer")).map(file => { val content = ResourceHelper .parseTupleSentences(file, "TXT", '|', 500).flatMap(_.tupleWords) .flatMap(_.tupleWords) total_words += content.length (file, content) }).toMap println() println(total_words)
And here is where the magic happens. I have the answers, which both in spaCy and in Spark-NLP look like a dictionary of
(filename, array((word, tag))
. And this is the same format we have in the prediction step.
So, for this, I have created a small algorithm in which I run through every pair of prediction and answer, and compare the words in it. For every matched word, I count a matched token, and for every matched tag on them, I count for a success.
But, since words are tokenized differently than in the ANC results, I need to open up a small window into which a word is given a chance to match a specific number of rolling tokens in ANC, or else let me know where to continue my search in the file.
If the predicted word is sub-tokenized (fewer tokens than in ANC), then the word will never match and is ignored—e.g., two-legged|JJ where in ANC it’s two|NN – legged|JJ, then it will collect ANC tokens until it confirms it’s sub-tokenized and ignore it (construct is the collection of sub-tokens).
Then, the index is placed at the latest match, so, if the predicted word appears later (due to the previous word being sub-tokenized), it will eventually find it in range, and count it.
Here’s how it looks in code:
spaCy
start = time.time() word_matches = 0 tag_matches = 0 for file in list(prediction.keys()): last_match = 0 print("analyzing: " + file) for pword, ptag in answers[file.replace('testing', 'answer')]: print("target word is: " + pword) last_attempt = 0 construct = '' for word, tag in prediction[file][last_match:]: if word.strip() == '': last_match += 1 continue construct += word print("against: " + word + " or completion of construct " + pword) last_attempt += 1 if pword == word: print("word found: " + word) if ptag == tag: print("match found: " + word + " as " + tag) tag_matches += 1 word_matches += 1 last_match += last_attempt break elif pword == construct: print(pword + " construct complete. No point in keeping the search") last_match += last_attempt break elif len(pword) <= len(construct): print(pword + " construct larger than target. No point in keeping the search") if (pword in construct): last_match += last_attempt break print (time.time() - start)
Runtime
analyzing: ./target/testing/20000424_nyt-NEW.txt target word is: IRAQ-POVERTY against: IRAQ-POVERTY or completion of construct IRAQ-POVERTY word found: IRAQ-POVERTY target word is: ( against: ( or completion of construct ( word found: ( match found: ( as ( target word is: Washington against: Washington or completion of construct Washington word found: Washington match found: Washington as NNP target word is: ) against: ) or completion of construct ) word found: ) match found: ) as ) target word is: Rep. against: Rep or completion of construct Rep. against: . or completion of construct Rep. Rep. construct complete. No point in keeping the search
As we can see, it will give tokens a chance to find a mate; this algorithm basically keeps tokens synchronized, nothing more.
print("Total words: " + str(total_words)) print("Total token matches: " + str(word_matches)) print("Total tag matches: " + str(tag_matches)) print("Simple Accuracy: " + str(tag_matches / word_matches))
Runtime
Total words: 21381 Total token matches: 20491 Total tag matches: 17056 Simple Accuracy: 0.832365428724806
Spark-NLP
We see similar behavior in Spark-NLP, perhaps just a little bit more of mixed imperative and functional programming:
var misses = 0 var token_matches = 0 var tag_matches = 0 for (file <- prediction.keys) { var current = 0 for ((pword, ptag) <- prediction(file)) { println(s"analyzing: ${pword}") var found = false val tags = answer(file.replace("testing", "answer")) var construct = "" var attempt = 0 tags.takeRight(tags.length - current).iterator.takeWhile(_ => !found && construct != pword).foreach { case (word, tag) => { construct += word println(s"against: $word or if matches construct $construct") if (word == pword) { println(s"word match: $pword") token_matches += 1 if (tag == ptag) { println(s"tag match: $tag") tag_matches += 1 } found = true } else if (pword.length < construct.length) { if (attempt > 0) { println(s"construct $construct too long for word $pword against $word") attempt -= attempt misses += 1 println(s"failed match our $pword against their $word or $construct") found = true } } attempt += 1 }} current += attempt } }
Runtime
analyzing: NYT20020731.0371 against: NYT20020731.0371 or if matches construct NYT20020731.0371 word match: NYT20020731.0371 analyzing: 2002-07-31 against: 2002-07-31 or if matches construct 2002-07-31 word match: 2002-07-31 analyzing: 23:38 against: 23:38 or if matches construct 23:38 word match: 23:38 analyzing: A4917 against: A4917 or if matches construct A4917 word match: A4917 analyzing: & against: & or if matches construct & word match: & tag match: CC
We then compute the measures:
println("Total words: " + (total_words)) println("Total token matches: " + (token_matches)) println("Total tag matches: " + (tag_matches)) println("Simple Accuracy: " + (tag_matches * 1.0 / token_matches))
Runtime
Total words: 21362 Total token matches: 18201 Total tag matches: 15318 Simple Accuracy: 0.8416021097741883
Spark-NLP, but where’s Spark?
I haven’t made many notes regarding where Spark fits in all this. But, without boring you too much further, I’ll put everything in easy-to-read bullets:
- You guessed it! This is an overkill for Spark. You don’t need four hammers for a single small nail. With this I mean, Spark works in distributed fashion by default, and defaults are meant for quite big data (e.g.,
spark.sql.shuffle.partitions
is 200). In this example, I’ve made the attempt to control the data I work with, and not expand too much. “Big data” can become an enemy, memory issues, bad parallelism or a slow algorithm might slap you back. - You can easily plug HDFS into this same exact pipeline. Increase the number of files or size, and the process will be able to take it. You can even take advantage of proper partitioning and putting data in memory.
- It is equally possible to plug this algorithm into a distributed cluster, submit it to wherever the driver is and the workload is automatically distributed for you.
- This specific NLP program shown here, is not really good for scalable solutions. Basically, collecting with
collect()
all words and tags down to the driver will blast your driver memory if you apply this into a few hundred megabytes of text files. Measuring performance in such a scenario will require proper map reduce tasks to count the number of POS matches. This explains why it takes longer in Spark-NLP than in spaCy to bring back a small number of predictions to the driver, but this will revert for larger inputs.
What’s next?
In this post, we compare the work for running and evaluating our benchmark NLP pipeline on both libraries. In general, your personal preference or experience may tilt you toward preferring the core Python libraries and imperative programming style with spaCy, or core Spark and functional programming style with Spark-NLP.
For the small data set we tested on here, runtime was below one second on both libraries, and accuracy was comparable. In part three of the blog series, we’ll evaluate on additional data sizes and parameters.