Chapter 4. Generate Data Observations

As explained in Chapter 3, data observability combines technology and people to gather information about the state of a system from the data perspective, and the expectations of that state. It then uses the information to make the system more adaptable or resilient.

This chapter explains how to apply the data observability practice. I will start with “data observability at the source,” a method to introduce collection strategies in your day-to-day data work, and I’ll show you how to minimize its impact on efficiency. Then, the chapter elaborates on implementing expectations that will subscribe to the software delivery lifecycle, such as continuous integration and continuous deployment (CI/CD).

As with any emerging practice and technology, to increase data observability adoption, you need to lower the barrier to entry; that way, people have less reason to argue against the change. However, people are also part of the solution, as their involvement in the process is crucial to identify their expectations and codify the rules. To this end, you’ll learn several ways to reduce the effort required to generate observations and understand how to introduce them at the right moment of the development lifecycle.

At the Source

Chapter 2 explained the sources and types of information that help the observer. But how do you generate and collect the information from these sources?

It starts with data observability at the source. The term “source” refers to the applications responsible for reading, transforming, and writing data. These applications can either be the root cause of issues or the means to solve them, as explained in Chapter 3. Additionally, applications are within our control as engineers and organizations, unlike the data itself.

Hence, the method of data observability at the source turns on the ability for applications to generate data observations following the model covered in Chapter 2; in other words, the applications are made data observable.

Pure data and analytics observations—such as data sources, schemas, lineages, and metrics—correlate to reading, transforming, and writing activities. The strategies described in this section address these activities by explaining what to consider when running them.

Generating Data Observations at the Source

Producing data observations at the source begins by generating additional information capturing the behavior of specific activities with data: reading, transforming, and writing. For example, developers can add lines of logs that contain the information needed to generate visibility about what their application does. The instructions use the channels—that is, the logs, metrics, and traces or lineages—covered in Chapter 2 to convey the observations that can be centralized in a logging system.

In the next section, you learn how to create data observations in JSON format that can be collected (published) to local files, a local service, a distant (web) service, or similar destinations. For example, following the data observability core model in Chapter 2, the data source and schema entity of a Postgres table would look like Example 4-1.

Example 4-1. Example of data observations encoded as JSON
{
  "id": "f1813697-339f-5576-a7ce-6eff6eb63249",
  "name": "gold.crm.customer",
  "location": "main-pg:5432/gold/crm/table",
  "format": "postgres"
}
{
  "id": "f1813697-339f-5576-a7ce-6eff6eb63249",
  "data_source_ref": {"by_id": "e21ce0a8-a01e-5225-8a30-5dd809c0952e"},
  "fields": [
  { "name": "lastname", "type": "string", "nullable": true }, 
  { "name": "id", "type": "int", "nullable": false }
  ]
}

This ability to collect data observations in the same model on a centralized platform is key to generating the value of data observability at scale, such as across applications, teams, and departments. That’s why using the data observability core model is important to aggregating data observations with ease, especially along lineages.

Let’s see how to generate data observations in Python using a low-level API, which will be used to introduce higher-level abstractions (covered in the next chapters).

Low-Level API in Python

This strategy of using a low-level API requires a lot of your time and involvement because you are responsible for creating every observation explicitly. However, this strategy also gives you the greatest flexibility, because it doesn’t involve any higher-level abstractions.

On the flip side, the support of data observability at this level, especially during exploration and maintenance, requires the developer to be consistent and always think about what they may want to observe in production (for example, any senior developer should be producing as many lines for logs and checks as for business logic).1 During development, developers must then create visibility about the modifications brought to the logic or behavior of the application with the generation of the associated observations. Examples of such observations include a connection to a new table, the creation of a new file, or the alteration of a structure with a new field.

In the following sections, you will go through a complete example of data applications written in Python that generate data observations alongside their data usage by doing the following:

  • Understanding applications without data observability capability

  • Adding instructions to generate data observations and their purpose

  • Gaining insights into the pros and cons of using this strategy

Description of the Data Pipeline

Throughout the remainder of the chapter, I will use a data pipeline written in Python that we will make data observable. The code of the pipeline on GitHub allows you to run the examples in this chapter. It uses the pandas library to manipulate CSV files and is composed of two applications, ingestion and reporting, as presented in Figure 4-1.

Structure of the example pipeline
Figure 4-1. Structure of the example pipeline

Both applications (ingestion and reporting) use Python and pandas and share data from the “Daily Stock Prices” data source to create two downstream reports (BuzzFeed stock and AppTech).

The ingestion application reads the CSV files for the daily stock prices that the stock market team provides each month. The team partitioned the files by year and month, then merged the prices into monthly views stored as a separate file, as shown in ingestion in Example 4-2.

Example 4-2. Data ingestion application
import pandas as pd

AppTech = pd.read_csv( 1
   "data/AppTech.csv",
   parse_dates=["Date"], 2
   dtype={"Symbol": "category"}, 3
)
Buzzfeed = pd.read_csv(
   "data/Buzzfeed.csv",
   parse_dates=["Date"],
   dtype={"Symbol": "category"},
)

monthly_assets = pd.concat([AppTech, Buzzfeed]) \ 4
   .astype({"Symbol": "category"})
monthly_assets.to_csv(
   "data/monthly_assets.csv", index=False
)

1 The ingestion application processes two CSV files for the following stocks:

  • BuzzFeed

  • AppTech

The application reads the files from the “../data” folder, for reference; a row in these files looks like this:

1 Date Symbol Open High Low Close AdjClose Volume
2 2022-01-03 APCX 14.725 15.200 14.25 14.25 14.25 1600

During exploration, our data engineer determined that the files have two fields that need to be preprocessed. 2 The Date field must be parsed as a date, 3 and the Symbol field must be treated as a category.

The application reads and processes the daily stocks before making them available in memory. Then, the application merges them into a single variable, which in this case is a new pandas DataFrame, containing all categories in Symbol.

4 The script writes the result into the same folder as a new file named monthly_assets.csv. This way, other analysts or data engineers can start there if they need all daily stock prices for further analysis.

After this first script runs and the file is available, the remaining pipeline scripts can run to generate the BuzzFeed and AppTech stock reports. This example has only one script, the reporting Python file, shown in Example 4-3.

Example 4-3. Data reporting application
import pandas as pd

all_assets = pd.read_csv("data/monthly_assets.csv", parse_dates=['Date']) 1

apptech = all_assets[all_assets['Symbol'] == 'APCX'] 2
buzzfeed = all_assets[all_assets['Symbol'] == 'BZFD']

buzzfeed['Intraday_Delta'] = buzzfeed['Adj Close'] - buzzfeed['Open'] 3
apptech['Intraday_Delta'] = apptech['Adj Close'] - apptech['Open']

kept_values = ['Open', 'Adj Close', 'Intraday_Delta']

buzzfeed[kept_values].to_csv("data/report_buzzfeed.csv", index=False) 4
apptech[kept_values].to_csv("data/report_appTech.csv", index=False)

This application performs the following actions:

1 Reads the data source created by the previous application

2 Filters out the data for the two reports it creates: BuzzFeed and AppTech.

3 Computes the Intraday_Delta scores, which are the daily stock evolutions to report.

4 Reports the Open, Adj Close, and the computed score Intraday_Delta values in the dedicated files for each stock.

To execute the pipeline appropriately, it is important to respect the dependencies between the reporting and ingestion applications. That is, you must run the ingestion application successfully before you run the reporting application. Otherwise, the pipeline will fail. A solution to achieve this is to use an orchestrator that runs ingestion before reporting, such as Dagster and Airflow. In fact, the orchestrator is another application to configure by explicitly hardcoding the dependencies between applications at the data level. However, the applications themselves are still ignorant of their downstream dependencies.

The flip side of hardcoding the dependencies in an orchestrator is that it is a new asset to maintain (e.g., the accuracy of the explicit dependencies). Also, additional constraints are imposed when creating a pipeline because teams must stay independent and therefore can’t simply update an existing pipeline to meet their own needs. Thus, an extension must be created at a higher level by adding a new, separated DAG, which could disconnect the explicit dependencies.

Returning to our pipeline, let’s discuss the functional dependencies between the applications; that is, ingestion must run successfully before reporting can be run. But what does successfully mean?

Definition of the Status of the Data Pipeline

To determine whether an execution of a data pipeline is successful, I will analyze the opposite question: What kind of failures can occur in the ingestion?

Explicit failure
This causes the application to crash. This type of failure is easy for the orchestrator to handle if you use one, because it’s a flag not to trigger the next application: the reporting application, in our example.
Silent failure
In this case, the application finishes, without, for example, an error code or log. Because it doesn’t run as expected, you must consider the notion of expectations introduced in Chapter 2.

An observer of the ingestion application would face explicit failures like these:

File not found errors
Occur when any of the data files, such as Buzzfeed.csv, aren’t available in the folder because it was renamed or changed to lowercase, or the file wasn’t created before running the ingestion application.
Type errors (TypeError)
Occur if some values aren’t coercible to match the types provided to the read_csv function, such as when the symbol should be a category.
Hardcoded name errors
Occur when any of the fields explicitly used in the code to access the values, such as the column name Date in Example 4-3 1 and Symbol in this case, aren’t present or the name has changed.
Filesystem errors
Happen when the files aren’t readable or the folder isn’t writable for the user who runs the application.
Memory errors
Happen when the files increase in size to the point that the memory allocated to the application is no longer sufficient.
System errors
Triggered if the disk doesn’t have any more space to write the aggregated result.

But from the perspective of the engineer who observes it, the following examples indicate silent failures:

  • The Date column isn’t parsable as a date because it’s malformed, the pattern changed, or the time zone isn’t consistent. In this case, the column isn’t a datetime anymore but an object.

  • The Date column contains values but not for the current month. All values are past or future dates.

  • The Date column contains values in the future because the generator might have run later and generated information about the future to compare to the month in which it’s processing. This situation can generate duplicates later, or inconsistent values for the same dates, and can likely fail some aggregations.

  • The Symbol category changes for various reasons, such as because of a typo or a capitalization change, length, or reference. This category is used across the files and is written in the output file as a category.

Also, the reporting application might regard the ingestion as failed because it provoked the following failures in reporting:

  • The monthly aggregated file wasn’t written when the reporting started, making it unavailable when the reporting tool is scheduled to run at a given interval.

  • Any of the fields used to filter in the arithmetic aren’t available or their name changed.

  • The same errors occur with read/write access, size, and space.

Also, reporting can fail silently due to the silent failures of the ingestion, plus:

  • The APCX and ENFA symbols aren’t available in the provided file.

  • Adj Close or Open are missing values, which generates garbage in the Intraday_Delta output. This problem can also be an explicit failure, in which case the values become pandas’ NA.

  • The aggregated file doesn’t contain information about the current month, but the dates are in the past.

Because any of these failures can occur, you must have visibility when they happen and—even better—anticipate them earlier to prevent their propagation (see “Fail Fast and Fail Safe”) from the ingestion application.

The explicit failures should already have been made visible, as a development practice, to catch those errors explicitly (try…except in Python). However, for an observer to identify and discover silent failures, they need the application to produce the appropriate observations.

Data Observations for the Data Pipeline

In this section, I will provide an overview of the data observations the data pipeline must generate. For this, let’s have a quick glance at Figure 4-2, which shows how a low-level API implements the model presented in Chapter 2. It is interesting to note that they share a similar structure and even some entities (labeled); in the next paragraphs, I will detail each portion individually to highlight these facts.

The observations for ingestion and reporting and their similarities
Figure 4-2. The observations for ingestion and reporting and their similarities (a larger version of this figure is available at https://oreil.ly/TaQGV)

In this diagram, you’ll notice labeled entities as uppercase letters A, B, C, and D in circles. The “A” data sources highlight the observations generated by the ingestion application about the data it generates, and by the reporting as it is reading it, hence making explicit the implicit dependency.

In fact, both applications generate several similar observations, which represent all dependencies that tie them together. In Figure 4-2, the following similar observations are also highlighted:

  • The “B” entities observe the server the data was retrieved from.

  • The “C” entities observe the user executing commands.

  • The “D” entities observe the schema of the data generated by the ingestion, as read by the reporting.

Let’s dig into what we need to add to the application’s code to generate the observations shown in Figure 4-2. Because the code is in Python, we’ll use the logging module to print the observations that are encoded in JSON.

Generate Contextual Data Observations

In this section, I will cover the code needed to generate observations about the execution context of the ingestion application shown in Figure 4-3 (note that reporting can reuse the same code).

Data observations about the execution context of the ingestion application
Figure 4-3. Data observations about the execution context of the ingestion application

Insert the code in Example 4-4 at the beginning of the file to generate the observations for the ingestion application.

Example 4-4. Generate data observations about the ingestion application
app_user = getpass.getuser() 1
repo = git.Repo(os.getcwd(), search_parent_directories=True) 2
code_repo = repo.remote().url
commit = repo.head.commit
code_version = commit.hexsha
code_author = commit.author.name
application_name = os.path.basename(os.path.realpath(__file__)) 3
application_start_time = datetime.datetime.now().isoformat()

1 Retrieve user name information.

2 Retrieve git information.

3 Retrieve information about the application execution.

The extra instructions in Example 4-5 create variables for the observations, but nothing is done yet with them. To log the information as mentioned previously, we use a JSON representation of the information model, encoded as in Example 4-5.

Example 4-5. Modeling data observations about the ingestion runtime
application_observations = { 
   "name": application_name,
   "code": {
       "repo": code_repo,
       "version": code_version,
       "author": code_author
   },
   "execution": {
       "start": application_start_time,
       "user": app_user
   }
}

This code creates the JSON that’s formed with all the observations created so far. However, this section is about using a low-level API for data observability. As we move on, we will encounter a similar pattern, giving us the opportunity to create functions to simplify the code and share it across the ingestion and reporting applications, or any other Python applications in the future.

To create an API, we create a model that mimics the observation core model in JSON, turns each entity into a class, and converts relations into references (see Example 4-6 ).

Example 4-6. Modeling the application data observations using dedicated classes
    class Application: 1
       name: str
    
       def __init__(self, name: str, repository: ApplicationRepository) -> None:
           self.name = name
           self.repository = repository
    
       def to_json(self):
           return {"name": self.name, "repository": self.repository.to_json()}
    
    class ApplicationRepository: 2
       location: str
    
       def __init__(self, location: str) -> None:
           self.location = location
    
       def to_json(self):
           return {"location": self.location}
    
    app_repo = ApplicationRepository(code_repo)
    app = Application(application_name, app_repo)

That means the application entity has to have an Application 1 class with a property name that can hold the name of the file as an application_name variable, and a reference to an ApplicationRepository instance. This ApplicationRepository entity will be encoded as an ApplicationRepository 2 class that has a property location set as the git remote location. This structure will help build the model and generate a JSON representation more easily that will be reusable and able to lead to standardization.

An additional benefit of encoding concepts into API classes is that it gives them the responsibility to propose helpers to extract associated observations, as in Example 4-7 1.

Example 4-7. Leverage classes to define helpers related to modeled entities
class ApplicationRepository:
   location: str

   # [...]

   @staticmethod
   def fetch_git_location(): 1
       import git
       code_repo = git.Repo(os.getcwd(), search_parent_directories=True).remote().url
       return code_repo


class Application:
   name: str

   # [...]
   @staticmethod
   def fetch_file_name(): 1
       import os
       application_name = os.path.basename(os.path.realpath(__file__))
       return application_name


app_repo = ApplicationRepository(ApplicationRepository.fetch_git_location())
app = Application(Application.fetch_file_name(), app_repo)

This strategy might be a straightforward way to implement the model. However, we prefer another approach that weakens the links between entities. In Example 4-8, all information will be logged in one JSON with entities spread into the tree of information with Application at its root. This encoding forces us to create all observations before logging the root one, which is the Application instance. The Application constructor would become something like Example 4-8.

Example 4-8. Bloated constructor for Application without separation of concerns
class Application:
    name: str
    def __init__(self, name: str, version: ApplicationVersion, 
                 repo: ApplicationRepository, execution: ApplicationExecution, 
                 server: Server, author: User) -> None:
        pass

To avoid this complexity and constraint, a better way is to reverse the dependencies between the entities. Instead of having the Application contain its ApplicationVersion or ApplicationRepository, we’ll create the Application alone and then add a weak reference to it from within the ApplicationVersion and ApplicationRepository. Example 4-9 shows how this model would look.

Example 4-9. Reversing the dependencies between entities and introducing id
class ApplicationRepository:
   location: str
   application: Application
   id: str

   def __init__(self, location: str, application: Application) -> None:
       self.location = location
       self.application = application
       id_content = ",".join([self.location, self.application.id])
       self.id = md5(content.encode("utf-8")).hexdigest() 1

   def to_json(self):
       return { "id": self.id, 
                "location": self.location, 
                "application": self.application.id }

   @staticmethod
   def fetch_git_location():
       import git
       code_repo = git.Repo(os.getcwd(), 
                            search_parent_directories=True).remote().url
       return code_repo

With this model, we can log the observations individually—the  two calls to logging.info—reducing the amount of information to retain. Because we need to recompose the entity relationships, we introduce the id variable to reduce the amount of information to log and observations to link. Using the logs, id can reconstruct the links in the model, like the dependency between the ApplicationRepository and the Application, by their ids, because they have already been logged.

In this example, the application generated id locally, resulting in a poor design that will make it inconsistent across several executions. To circumvent this problem, we must define a functional id that can identify the entities across executions, deployments, and applications. This notion is known as the primary key in modeling. You can use a primary key as an input in a hashing algorithm, for example, that will generate the id in a deterministic fashion, in this case using hashlib

Example 4-9 illustrates how to use the primary key to generate the id consistently, for example, by using md5 1. We’ll use this strategy throughout this chapter to generate entities. 

Wrap-Up: The Data-Observable Data Pipeline

Before moving on with the analyses of how the observations help with the explicit and silent failures introduced at the beginning of this section, we reuse what we’ve done so far for the ingestion application to make the reporting application data observable. See Example 4-19.

Example 4-19. Reporting application with verbose data observations generated from code
import ApplicationRepository.fetch_git_location
import ApplicationVersion.fetch_git_version
app = Application(Application.fetch_file_name())
app_repo = ApplicationRepository(fetch_git_location(), app)
git_user = User(ApplicationVersion.fetch_git_author())
app_version = ApplicationVersion(fetch_git_version(), git_user, app_repo)
current_user = User("Emanuele Lucchini")
app_exe = ApplicationExecution(app_version, current_user)

all_assets = pd.read_csv("data/monthly_assets.csv", parse_dates=['Date'])

apptech = all_assets[all_assets['Symbol'] == 'APCX']
buzzfeed = all_assets[all_assets['Symbol'] == 'BZFD']

buzzfeed['Intraday_Delta'] = buzzfeed['Adj Close'] - buzzfeed['Open']
apptech['Intraday_Delta'] = apptech['Adj Close'] - apptech['Open']

kept_values = ['Open', 'Adj Close', 'Intraday_Delta']

buzzfeed[kept_values].to_csv("data/report_buzzfeed.csv", index=False)
apptech[kept_values].to_csv("data/report_appTech.csv", index=False)

all_assets_ds = DataSource("data/monthly_assets.csv", "csv")
all_assets_sc = Schema(Schema.extract_fields_from_dataframe(all_assets), 
                 all_assets_ds)
buzzfeed_ds = DataSource("data/report_buzzfeed.csv", "csv")
buzzfeed_sc = Schema(Schema.extract_fields_from_dataframe(buzzfeed), buzzfeed_ds)
apptech_ds = DataSource("data/report_appTech.csv", "csv")
apptech_sc = Schema(Schema.extract_fields_from_dataframe(apptech), apptech_ds)
# First lineage
lineage_buzzfeed = OutputDataLineage(buzzfeed_sc,
                                    OutputDataLineage.generate_direct_mapping
                                    (buzzfeed_sc, [all_assets_sc]))
lineage_buzzfeed_exe = DataLineageExecution(lineage_buzzfeed, app_exe)
all_assets_ms_1 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                   1 all_assets_sc, lineage_buzzfeed_exe)
buzzfeed_ms = DataMetrics(DataMetrics.extract_metrics_from_df(buzzfeed), buzzfeed_sc, 
               lineage_buzzfeed_exe)
# Second lineage
lineage_apptech = OutputDataLineage(apptech_sc,
                                    OutputDataLineage.generate_direct_mapping
                                    (apptech_sc, [all_assets_sc]))
lineage_apptech_exe = DataLineageExecution(lineage_apptech, app_exe)
all_assets_ms_2 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                   1 all_assets_sc, lineage_apptech_exe)
apptech_ms = DataMetrics(DataMetrics.extract_metrics_from_df(apptech), apptech_sc, 
              lineage_apptech_exe)

By adding the observations this way, we keep the modifications similar to what we did in the ingestion application. This approach enables us to build habits and abstractions, such as a framework, that will reduce the number of changes needed—almost an implicit law in development.

In Example 4-19, notice that the observations generated for the inputs shifted to the end 1. We made this implementation choice for the simplicity of the example. An upside is the additional computations are done at the end without impacting the business flow. A downside is that, if something fails in between, no observations about the data sources and their schema are sent. Of course, it is possible to avoid this situation with some adaptations to the code. Also, for ‌this introduction to a low-level API, we have to add some boilerplate to generate the right information, which may sound like noise, proportionally to the business code. However, keep in mind that the script initially had no logs. In general, logs are added sporadically to generate some information about the script’s own behavior, which is what we’ve done, but for the data.

Also, keep these points in mind:

  1. We reused the OutputDataLineage.generate_direct_mapping helper as is to create lineage mapping between the outputs and input. However, it won’t work, because we aggregated Adj Close and Open from the monthly_assets.csv file into the new Intraday_Delta column. Because the fields don’t have the same name, the “direct” heuristic won’t pick up this dependency.

  2. A warning message is displayed about reporting the same observations a second time for the monthly_assets. We did this because we encoded lineage per output. We now have two lineages, each producing output for the report_buzzfeed.csv and report_AppTech.csv files. Because the output reuses the same data as the input (filtered), we must report what the input looks like for each output to avoid them appearing as duplicates. As an alternative, we could reuse the observations or adapt the model to address this duplication. You might consider the following options instead:

    • If we change our strategy to read every time the data is accessed instead of loading it in memory, then if the data changes between the two writing operations, the observations will no longer be identical. If one output has issues, we prefer to synchronize the observations of the input with this lineage. The probability of this situation increases when you consider each writing operation can take hours, not seconds.

    • The reporting application generates all outputs at every run, but refactoring later can change this and make it parameterable. For example, only one output might be created, such as BuzzFeed. Therefore, each reporting dataset is generated by independent runs. In this case, the observations already represent this appropriately, so we don’t need to adjust the logic. In other words, sending the observations of a given input as often as it is used to create an output represents reality, rather than trying to optimize to reduce data that might look like duplicates.

Let’s address the first point and ensure that the lineage can represent the real connections between the data sources. To do this in a simplified manner, we will update the helper function introduced in Example 4-19 with extra information. Example 4-20 1 shows the new version of this helper which now includes an argument to provide nondirect mapping for each input. Later sections present strategies to handle this common use case in a much easier, efficient, maintainable, and accurate way, such as by using monkey patching.

Example 4-20. Generated field level lineage based on matching field names
@staticmethod
def generate_direct_mapping(output_schema: Schema, 
                            input_schemas: list[tuple[Schema, dict]]):
   input_schemas_mapping = []
   output_schema_field_names = [f[0] for f in output_schema.fields]
   for (schema, extra_mapping) in input_schemas:
       mapping = {}
       for field in schema.fields:
           if field[0] in output_schema_field_names:
               mapping[field[0]] = [field[0]]
       mapping.update(extra_mapping) 1
       if len(mapping):
           input_schemas_mapping.append((schema, mapping))
   return input_schemas_mapping

Example 4-21 shows the final observations part of the reporting application.

Example 4-21. Reporting application made data observable with clean code
# First lineage
intraday_delta_mapping = {"Intraday_Delta": ['Adj Close', 'Open']}
a = (all_assets_sc, intraday_delta_mapping)
lineage_buzzfeed = OutputDataLineage(buzzfeed_sc,
                                    OutputDataLineage.generate_direct_mapping(
                                    buzzfeed_sc, [(all_assets_sc, 
                                    intraday_delta_mapping)]))
lineage_buzzfeed_exe = DataLineageExecution(lineage_buzzfeed, app_exe)
all_assets_ms_1 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                   all_assets_sc, lineage_buzzfeed_exe)
buzzfeed_ms = DataMetrics(DataMetrics.extract_metrics_from_df(buzzfeed), 
               buzzfeed_sc, lineage_buzzfeed_exe)
# Second lineage
lineage_apptech = OutputDataLineage(apptech_sc,
                                   OutputDataLineage.generate_direct_mapping(
                                    apptech_sc, [(all_assets_sc, 
                                                 intraday_delta_mapping)]))
lineage_apptech_exe = DataLineageExecution(lineage_apptech, app_exe)
all_assets_ms_2 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                   all_assets_sc, lineage_apptech_exe)
apptech_ms = DataMetrics(DataMetrics.extract_metrics_from_df(apptech), apptech_sc, 
              lineage_apptech_exe)

Using Data Observations to Address Failures of the Data Pipeline

Now, we can deploy, run, and monitor our pipeline by using the observations they generate each time they’re run. The low-level API required quite a bit of extra effort and conviction to achieve. Nevertheless, we are satisfied with the results. Every minute spent on those tasks will give us 100 times the benefit in the form of avoidance of revenue loss—following the total quality cost rule, 1-10-100—when issues happen in production.

Let’s look at the issues that can arise that were mentioned in this section, starting with the ingestion application failures:

Input files not found
The DataSource observations are sent every run. After they’ve been read, the failing runs are, therefore, not sending any of them. Even for someone without any knowledge about the application logic, it’s clear the files used until now are missing.
Type errors at read
The Schema observations are sent and contain the field names associated with their type. Therefore, the expected type is clear for the observer without going to the application or the files for the previous months.
Errors due to missing fields
The same observations as for “Type errors at read” help the observer quickly identify which fields were present in the previous runs that are missing in the current one.
Filesystem errors
The exception thrown by the pandas library normally provides the path that resulted in an error. The remaining information available for the observer to identify the issue is which server this path used. The IP provided in the server observations associated with the DataSource gives instant visibility about which server was related to that path.
Memory error

This issue happens mostly when the data increases suddenly. Many cases can be considered, but they are mostly handled intuitively by the observer using DataMetrics observations that contain the number of rows, the schema with the number of fields, or the number of DataSources. Regardless, it might require the observations to be sent earlier than at the end of the script, such as in the following two cases:

  • One of the input files has a larger size than before. The file is detected easily because no DataMetrics are available for it.

  • The output has grown a lot because all files have grown. The size difference is detected because the DataMetrics for the outputs are missing. Also, the DataMetrics for the inputs show an increase in the number of rows.

Filesystem space errors

These errors most likely happen when writing the output, considering that we’re addressing data observability cases here. Therefore, the same information as for “Memory error” gives the observer instant visibility about why the available space is no longer sufficient and which files couldn’t be written.

The Date is not parsable as a date

  • In this case, the schema has the type observation for the date field change from date to something else, such as str or object.

Date column doesn’t contain values for the current year/month

  • The DataMetrics observations include the minimum and maximum time stamp, which gives instant visibility into the difference between the execution time and the available data. For example, let’s consider the maximum time stamp is two days in the past to the time when the data source is read, then the data can be considered too old if the acceptable period is only one day.

Date column contains values in the future

  • This one is easy, as the same kind of observations as for the previous failure due to missing values for the current month will give you this visibility.

Symbol categories changed

  • If we consider only numeric DataMetrics, we can quickly identify this case by using the number of categories that would grow in the output file. One or some of the files wouldn’t be consistent anymore, as they would refer to different categories.

Then, we must consider under what circumstances the reporting application might regard the ingestion application as failed, and where applicable, what the reporting application an observer could use. These situations include the following:

The monthly aggregated file is not available

  • The DataSource observation or DataMetrics hasn’t been sent by the ingestion application.

Aggregation uses missing fields, such as Close

  • The schema of the monthly data sent by the ingestion application is missing those fields too.

Errors with read/write access, size, and space

  • The same solutions apply to the reporting observer as the ingestion observer. There is no bias of information across teams or team members.

APCX and ENFA symbols

  • The number of categories reported by the ingestion observer has changed, giving hints of what is happening, in some cases. However, we can extend DataMetrics to also report non-numerical observations and report the categories.

Missing values in Adj Close or Open leading to abnormal numbers

  • The DataMetrics “number of nulls” covers this case because when the number of nulls is greater than zero, the computation of the Intraday_Delta will return NAs.

Wrong date in the monthly assets

  • The same solutions used by the ingestion observer related to date failures apply here. For example, the application could use the minimum and maximum value for the Date column compared to the currently reported month.

We’re now ready to handle various situations wherein the data is the source of a problem. Without this knowledge, these situations would require long, high-stress meetings to understand them and exhausting hours or days spent debugging that could turn into months, because we can’t access the data in production.

The issues discussed so far are ones we know can happen. However, many other issues are likely to propagate throughout the pipeline that we know little about yet—the unknown-unknowns. For example, the values of one of the stocks are incorrect for a previous month due to errors introduced into the data during the CSV export. This problem will happen for at least one of your applications, and others like it.

Because of these unknown-unknowns, data observations mustn’t be restricted to cover only predefined cases, but the applications must report as many observations as possible—with possible constraints on computation resources and time—to generate visibility about the anticipated or unmet situation. In this example, the distribution of the monthly stock values would be useful later for comparison with other months, and they could provide hints about whether the values are equal or similar.

The advantage of using low-level logging is having full flexibility over what you can generate as visible. Examples are custom metrics and key performance indicators (KPIs).

All jobs aren’t equal; every company, project, and application has its own specific details. You’re likely to control specific metrics, whether they are linked to consumed data or produced data. Such metrics, for a table, could be the sum of the number of items multiplied by the cost per unit minus the amount received from a web service, count(items) * cost_per_unit. The result must always be greater than zero. This easily can be added to the source code, but it has to be added by the engineer, as this constitutes specific metrics associated with the business logic (and the semantics of the columns).

Another reason for customizing observations is KPIs—the numbers stakeholders request that are important to the underlying business. KPIs are often reported periodically or computed on demand and used at random or fixed intervals. However, their meaning is so strong that stakeholders have high expectations for them, with little to no time to wait for corrections. Therefore, you must also create visibility about how KPIs evolve over time, because, if a stakeholder has doubts about it, time starts ticking the moment they ask you about its correctness. To increase responsiveness, you must generally know how the KPIs evolve, identify how they change before they do, and understand why they change based on their historical values and lineage.

As you might have anticipated during the reporting application update, defining the API—the model, encoding, and functions—isn’t a task for each application. Rather, you must standardize and reuse it across applications. Standardization reduces the amount of work per application. More importantly, it enables the observations to be homogeneous, independently of the application, to simplify the observers’ work in matching the behaviors of other applications that participate in the pipeline they are analyzing.

Standardization is also helpful for reusing entities across applications, such as the assets_monthly DataSource, which is the ingestion application’s output and the reporting application’s input. With the homogenous representation of the observations, you can consolidate the status of the entire pipeline by reusing the entities across applications.

Part of the architecture to support data observability must include creating an external system that aggregates the observations to build a global view, systematically. By having observers that rely on this aggregated view and act upon it, the system can evolve to perform some of the actions currently being done by the observers, which is where machine learning comes into play.

Conclusion

This chapter has provided a comprehensive exploration of data observability at the source and its significance in enhancing data quality and operational excellence. We delved into the concept of generating data observations within the code of data applications, highlighting the importance of incorporating observation generation code across various components of the application. These components include the application itself, the data being utilized, their relationships, and the content they hold.

Additionally, we discussed the creation of a data observability Python API at the low level, which offers a powerful toolset for developers to seamlessly integrate data observation capabilities into their applications. Through this API, practitioners gain the ability to generate data observations, track data flows, and ensure the reliability and accuracy of their data.

To reinforce these concepts, we presented a fully working example that showcased the transformation of a non-data-observable data pipeline written in Python into a robust and data-observability-driven pipeline. By leveraging the dedicated data observability Python API, we demonstrated how data observations can be generated, captured, and utilized to enhance visibility, identify issues, and drive continuous improvement.

As we move forward, the principles and strategies explored in this chapter serve as a foundation for incorporating data observability into the very fabric of data applications. By adopting these practices, organizations can ensure that their data pipelines are robust, reliable, and capable of providing valuable insights with a high level of trust.

Despite the hyper-customizability and flexibility of low-level logging, its adoption can be hindered by the initial effort required. This argument also applies to the adoption of tests. Therefore, it is crucial to simplify the usage complexity at this level. Additionally, we need to explore alternative approaches that complement low-level logging while promoting the widespread adoption of data observability across teams and individuals. The upcoming sections will delve into this subject, commencing with an exploration of event-based systems.

1 This is what led me to start thinking about the data-observability-driven development method.

Get Fundamentals of Data Observability now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.