Apache Spark for atom-smashing experiments

Crunching CERN’s colossal data with scalable analytics

By Siddha Ganju
June 9, 2016
Tape library, CERN, Geneva 2 Tape library, CERN, Geneva 2 (source: Cory Doctorow on Flickr)

Experiments at CERN, the European Organization for Nuclear Research, generates a vast amount of data. Typically, the volume of data processed is around 15PB, which becomes significantly higher when experiments at the Large Hadron Collider are in the running phase. It is critical for physicists who explore this colossal data to determine if the collisions of sub-atomic particles could lead to pioneering investigations or new insights in particle physics.

During an atom-smashing experiment, massive collisions take place within the Large Hadron Collider. As particles collide at high speeds, they decay in complex ways and pass through sub-detectors that register each particle’s passage. The sub-detectors contain microprocessors that collect the data. This data corresponds to the particle’s path and electrical energy dissipation. A digitized summary of each collision event is recorded, replicated, and distributed through a complex topology of sites for researchers to use. On average, the distributed data is accessed by 250 users who send up to 200,000 jobs per day. This data is collected over time to predict the popular data sets.

Learn faster. Dig deeper. See farther.

Join the O'Reilly online learning platform. Get a free trial today and find answers on the fly, or master something new and useful.

Learn more

Preparation and exploratory analytics with static as well as live data of petabyte-scale is extremely challenging. Researchers at CERN have developed a unique and promising two-stage process to tackle this dynamic challenge: first, identify which data is important and second, apply machine learning techniques to forecast popular data sets.

Identifying and forecasting popular data sets

The Compact Muon Solenoid (CMS) Detectors at the Large Hadron Collider are recording massive amounts of data, and it is important to prepare the data to identify popular data sets. This is the data preparation stage. The popularity metrics are based on:

  • nusers/day: number of users per day
  • naccesses: number of times a data set is accessed
  • totcpu: CPU usage in terms of hours

At run time when the CMS detector is producing the data, how can we predict if a data set will become popular in the future? How do we know whether scientists will want to look at the data distributed at various sites? This data is raw; it needs to be optimized against false positives and false negatives. Hence, threshold values for naccess, nusers and totcpu are applied to the data sets, through which popular data sets are predicted.

The second stage is where machine learning is brought to train and apply models on the prepared data. The rolling forecast model is trained using different machine learning algorithms, like Random Forest, Naive Bayes, and Stochastic Gradient Descent, to compare their performance. The popular data for the upcoming week is predicted and verified on the popularity database. It is then merged with the old data and the new data is collected. Thus, historical data helps to predict future popular data sets.

Based on the popularity of old data, CERN’s scientists can understand variations in popularity over time and predict whether the data will again become valuable to physicists in the future. It is important to note that out of the 600 million events per second (collision rate), many collisions are similar, which aid in adding to the history of that collision type.

Using Spark at CERN: Lessons learned

All the work of preparing data and applying machine learning algorithms was done in Spark. Spark was especially suited to the CMS data because of its persistence capabilities. I learned a lot after using Spark on the CERN data. Below is a consolidation of a few key tips and points:

Spark and Scala vs. Spark and Python

Spark is simple to learn if you are already acquainted with the Scala programming model. I used PySpark, which is built upon the functional programming features in Python. I was therefore able to utilize Lambda calculus for most of the transformations.


I think the most intuitive and important rule of MapReduce is the principle of locality (or locality of reference), which explains the frequency of access to related storage devices. Locality is either temporal (reuse of previously accessed data within a relatively short time) or spatial (close storage locations). Even in Spark, the principle of locality finds its place and must be utilized effectively.

Persist capability

Spark has the essential persist capability, which can be called by the persist() or cache() functions. Spark operators fall into two categories: transformations and actions. Transformations constitute a majority of all operations and are lazy; the operation is deferred until an action operator is encountered or the persist function is called. Persist needs to be used carefully. For instance, call persist on an RDD when the RDD will be reused in the future. The fault-tolerant cache operation will store all partitions on the node’s disk, which can then be reused by other actions. If fault tolerance is not a concern, then it is preferable to avoid caching in storage and keep all operations in memory. This leads to significant performance gains.

Rich transformations

Spark offers many transformations, such as map, filter, union, intersect, and distinct. These transformations allow RDDs to be dependent on each other. So, in a chain of RDDs, each child can be obtained from its parent using some transformation. The execution of the transformations is divided into stages and is carried out by designated workers. One thing to keep in mind is to make these transformations efficient, because if the chain becomes too long, the whole process will slow down. The general rule of thumb is that too many transformations cause high overhead.

Avoid excessive joins

Avoid too many joins if possible. To perform a particular task, Spark builds a hash table each time a join is called within that task. The hash table can be extremely large, and this makes the join memory intensive. This can often be avoided by making efficient joins or by increasing the level of parallelism, but the latter is not always possible. So, to be on the safe side, it is always a good idea to avoid too many joins.

Ensure algorithm parameters are constant

Another thing to keep in mind is that while comparing machine learning algorithms, their parameters must be kept same. So, while comparing the state-of-the-art implementations in Python’s Scikit-learn and Spark, the parameters for each algorithm were kept constant. If the parameters are not kept constant, then there would be no basis for comparing the results of the algorithms by the different frameworks. Comparison must always be made with similar kinds of entities, and keeping the parameters the same maintains the results of similar entities.

Note: CMS has released 300TB of data to the public. You don’t have to be a CERN member to play with all this data—the data is available here.

Post topics: Big Data Tools and Pipelines