DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • OpenSearch: Introduction and Data Management Patterns
  • Building a Cost-Effective ELK Stack for Centralized Logging
  • Achieving Security and Trust in a Data Fabric: The Role of Zero Trust Architecture
  • Data Architectures in the AI Era: Key Strategies and Insights

Trending

  • Unlocking the Benefits of a Private API in AWS API Gateway
  • Breaking Bottlenecks: Applying the Theory of Constraints to Software Development
  • Testing SingleStore's MCP Server
  • Integrating Security as Code: A Necessity for DevSecOps
  1. DZone
  2. Data Engineering
  3. Data
  4. Pydantic and Elasticsearch: Dynamic Couple for Data Management

Pydantic and Elasticsearch: Dynamic Couple for Data Management

Combining Pydantic and Elasticsearch for data management, we can create structured schemas and perform automatic data validation and enhance data consistency.

By 
Dmitrii Sedov user avatar
Dmitrii Sedov
·
Jun. 09, 23 · Analysis
Likes (2)
Comment
Save
Tweet
Share
6.5K Views

Join the DZone community and get the full member experience.

Join For Free

Elasticsearch is a highly scalable open-source full-text search and analytics engine. It allows you to store, search, and analyze large volumes of data quickly and in near real-time.

An Elasticsearch index is a collection of documents that are related to each other. Each document is a collection of fields, which are the key-value pairs that contain your data. An index is similar to a 'database' in a relational database, and the 'type' is similar to the 'table.'

Mapping is the process of defining how a document and the fields it contains are stored and indexed. For each index, you can define its mapping — a set of rules that explains how to interpret the data that's being inserted into the index. You may think of mapping as the schema definition, which would be equivalent to the table definition in SQL.

Each field in a document can have a different data type, and the mapping tells Elasticsearch what kind of data each field holds. The most common types are:

  • Numeric (long, double)
  • Boolean
  • Keyword
  • Text
  • Date
  • Nested

These types give information on how to store provided documents in the Elastic index. And if you have a complex schema with many different types of fields, where some a nested, and you want to ensure that your data stores in index the way you want — you need something to validate your input data and automate the creation of indexes with a specific schema. That's where Pydantic comes into play!

What Is Pydantic and Why Do You Need It?

Pydantic is a Python library that provides data validation and settings management using Python-type annotations. Its primary feature is that it validates the incoming data, ensuring that the data received match the developer's expectation and reduces the likelihood of runtime errors due to incorrect data. It can be highly beneficial when building APIs or data parsing modules, where data correctness is paramount.

Let's make some assumptions about using Pydantic schemas with Elastic and then try to prove them.

Creation of Indexes Does Not Require You Writing Mapping by Hand but Instead Can Be Described Explicitly by Pydantic Schema

Let's say we're creating a Book document for our Elasticsearch index, as we described earlier. We could define a Pydantic model for that Book document and use it to create the index and validate input data with it.

So, here is our script to create the index:

Python
 
from datetime import datetime
from typing import List
from pydantic import BaseModel
from elasticsearch import Elasticsearch

# Define index name and its number of relicas and shards at the start of the script
target_index = "books_v1.0"
n_shards = 3
n_replicas = 1

class Title(BaseModel):
    english: str

class Abstract(BaseModel):
    english: str
    french: str
    spanish: str

class Book(BaseModel):
  	id: str
    title: List[Title]
    author: List[str]
    publish_date: datetime
    abstract: List[Abstract]

    @validator("publish_date)
    def validate_date(cls, date: str):
        try:
            date = date[0:10]
            format_dt = "%Y-%m-%d"
            date = datetime.strptime(date, format_dt)
            return date
        except ValueError:
            raise ValueError(f"Date Field has wrong format: {date}")
      

type_map = {str: "keyword", datetime: "date", int: "long", list: "keyword", dict: "nested", List[BaseModel]: "nested"}

def create_es_mapping(pydantic_model: BaseModel):
    mapping = {}
    for field, field_type in pydantic_model.__annotations__.items():
        es_field_type = type_map.get(field_type)
        if not es_field_type:
            if issubclass(field_type, BaseModel):
                es_field_type = create_es_mapping(field_type)
            else:
                # assuming List[BaseModel] for nested types
                es_field_type = {"type": "nested", "properties": create_es_mapping(field_type.__args__[0])}
        mapping[field] = es_field_type
    return mapping

def main(model):
    es = Elasticsearch()

    # Generate mapping from Pydantic schema, its consists of 2 main sections: settings and mappings
    mapping = {
        "settings": {
            "number_of_shards": n_shards,
            "number_of_replicas": n_replicas
        },
        "mappings": {
            "properties": create_es_mapping(model)
        }
    }

    # Create the Elasticsearch index
    if not es.indices.exists(index=target_index):
    	es.indices.create(index=target_index, body=mapping)

if __name__ == "__main__":
    main(model=Book)


This script is designed to create an Elasticsearch index from a Pydantic model. It is setting up the index mapping, validating the data, and creating the index if it doesn't already exist. Here's a step-by-step description of what we do above:

  1. Imports necessary libraries: This includes datetime for handling date data, List from typing for type hinting, BaseModel from Pydantic for model definition, and Elasticsearch for interacting with an Elasticsearch cluster.
  2. Defines Constants: target_index is the name of the index to be created, n_shards is the number of primary shards the index should have, and n_replicas is the number of replicas each primary shard should have.
  3. Defines Pydantic models: These are Title, Abstract, and Book. Each model corresponds to a part of a book document. These models will be used to validate data and create the Elasticsearch index mapping.
  4. Data validation: Inside the Book class, there is a validator for the publish_date field, it ensures that the date is in the correct format, and if it isn't, it raises a ValueError.
  5. Creates Elasticsearch field type mapping: type_map is a dictionary that maps Pydantic field types to Elasticsearch field types. It is used in the create_es_mapping function. Type mapping is really important. Let's dive into it a little deeper. Here it is: {str: "keyword", datetime: "date", int: "long", list: "keyword", dict: "nested", List[BaseModel]: "nested"} In this type_map we map Python types to elastic inner data types, and our scheme will generate mapping based on this type of map, but when we will validate input data, it will use Python types to compare. It's very convenient for developers and easy to understand for review.
  6. Generates Elasticsearch mapping: The create_es_mapping function generates an Elasticsearch mapping from a Pydantic model. It uses the type_map to determine the Elasticsearch field type for each Pydantic field.
  7. Defines Main Function: The main function does several things:
  • It creates an Elasticsearch client instance.
  • It generates the Elasticsearch mapping from the provided Pydantic model and creates a mapping body that includes settings for the number of shards and replicas.
  • It checks if the target index exists in Elasticsearch. If it doesn't, it creates the index using the indices.create method of the Elasticsearch client.

8. Executes main function: The script ends by running the main function if the script is being run as the main program. It passes the Book model as an argument to the function.

So, what we just did? Out script automated the process of creating an Elasticsearch index from a Pydantic model, which can be very helpful in ensuring data consistency and reducing manual work. And now, you can keep track on GitHub how your index schema evolves over time, and it's easy to reproduce it whenever and wherever you want. It is just what we wanted to achieve!

Data Validation With Pydantic Schema Is Easy To Conduct and Guarantees Data Consistency

We wrote schema for the index books_v1.0 with the Python script above, let's call it books_schema.py, and created this index. Now, we upload some data into the index and validate data with this schema before it is uploaded. So, here is Python script to do this task:

Python
 
from books_schema import Book, target_index
from pydantic import BaseModel


def to_upload(documents: list, index: str, model: BaseModel):
	for document in documents:
    	prepared_doc = model.parse_obj(document)
    	es.index(index=target_index, id=document["id"], body=prepared_doc)

if __name__ == "__main__":
	# Initialize Elasticsearch client
    es = Elasticsearch()

    # Example dataset
    books_data = [
        {
            "title": [{"english": "To Kill a Mockingbird"}],
            "author": ["Harper Lee"],
            "publish_date": "1960-07-11",
            "abstract": [
                {
                    "english": "The unforgettable novel of a childhood in a sleepy Southern town and the crisis of conscience that rocked it.",
                    "french": "Le roman inoubliable d'une enfance dans une petite ville du Sud endormie et la crise de conscience qui l'a ébranlée.",
                    "spanish": "La inolvidable novela de una infancia en una tranquila ciudad sureña y la crisis de conciencia que la sacudió."
                }
            ]
        },
        #  more book data here...
    ]

	to_upload(documents=books_data, index=target_index, model=Book)


This script uploads a list of documents (in this case, books) to an Elasticsearch index. It also validates these documents against a Pydantic model before they are uploaded.

Here's a detailed breakdown of what's happening:

  1. Imports: The script starts by importing Book schema and target_index from books_schema.py. It also imports BaseModel from pydantic.
  2. Function definition: The to_upload function is defined. This function takes three arguments: documents (a list of documents to be uploaded), index (the name of the Elasticsearch index to which the documents will be uploaded), and model (a Pydantic model against which the documents will be validated). For each document in documents, the function validates the document against model and then indexes the document in Elasticsearch.
  3. Main script: The main part of the script is executed when the script is run directly (not imported as a module).
  • It first creates an instance of the Elasticsearch client.
  • It then creates a list of book data (books_data). Each item in this list is a dictionary representing a book.
  • Finally, it calls the to_upload function, passing books_data, target_index, and Book as the arguments. This will validate and upload each book in books_data to the Elasticsearch index specified by target_index. If publish_date field does not pass validation, or some data types will not correspond to those in Book class, an error will be raised preventing data to upload into the index.

We validate the input data against a pre-defined schema using Python's type annotations. This ensures that the data received for Elasticsearch indexing matches what's expected, reducing the likelihood of invalid data being indexed and causing errors later. If there's a mismatch, Pydantic will raise an error, enabling you to catch issues before they become more significant problems.

What Else Can I Do With Pydantic?

Now we know that the creation of indexes can be automated and how easy to validate data before uploading it, let's see what else we can do to reduce the likelihood of runtime errors due to incorrect data

Pydantic can make different types of validation. Here are some:

Data Conversion

Python
 
from pydantic import BaseModel, validator

class Product(BaseModel):
    name: str
    price: float

    @validator("price", pre=True)
    def convert_price(cls, price):
        try:
            return float(price)
        except ValueError:
            raise ValueError(f"Could not convert price to float: {price}")


The @validator decorator marks the convert_price function as a validator for the price field. The pre=True argument means that this function will be run before any other validation. Inside the convert_price function, we attempt to convert the provided price to a float, and if this conversion is not possible (for example, if the price is a string that cannot be converted to a float), a ValueError will be raised.

Validating Numbers

Python
 
from pydantic import BaseModel, conint

class Model(BaseModel):
    age: conint(gt=0, lt=150)  # age should be greater than 0 and less than 150

model = Model(age=25)  # This is valid
model = Model(age=150)  # This will raise a validation error


You can use the conint (constrained integer) and confloat (constrained float) types provided by Pydantic to validate numeric fields.

Validating Choices

Python
 
from pydantic import BaseModel
from enum import Enum

class FruitEnum(str, Enum):
    apple = "orange"
    banana = "banana"
    cherry = "cherry"

class Model(BaseModel):
    fruit: FruitEnum

model = Model(fruit="orange")  # This is valid
model = Model(fruit="mango")  # This will raise a validation error


Custom Validation (Most Used)

Python
 
from pydantic import BaseModel, validator

class Model(BaseModel):
    name: str

    @validator('name')
    def name_must_contain_space(cls, v):
        if ' ' not in v:
            raise ValueError('must contain a space')
        return v.title()  # this will convert the name to title case

model = Model(name="John Hopkins")  # This is valid
model = Model(name="JackDaniels")  # This will raise a validation error


Here, we check if there is a space in the name field. If there is no space, an error arises, and if space is found, the name is returned in the title case.

Methods We Used

To illustrate the power of combining Pydantic and Elasticsearch, we followed a practical approach of designing a data validation and index generation pipeline. The entire process was divided into several stages to demonstrate a clear and reproducible method.

  1. Understanding the Basics: We first provided a concise explanation of Elasticsearch and Pydantic, describing their primary functions and why they have widely used tools in the field of data management.
  2. Defining Elasticsearch Mappings: An explanation was provided about Elasticsearch mapping, which is essentially the process of defining how a document should be mapped to the search engine, including its searchable characteristics. For example, defining fields as date, integer, string, or using more complex types like nested and object fields.
  3. Creating Pydantic Models: After understanding the Elasticsearch mapping, we demonstrated how to translate it into a Pydantic model. For instance, the Book model, composed of nested Title and Abstract models, was used to represent the structure of book data. This model also includes data validation techniques like checking the format of the publish_date field.
  4. Generating Elasticsearch Index Mapping from the Pydantic Model: An automated Python script was presented which generates the Elasticsearch index mapping directly from the Pydantic model. This is a crucial step that enhances the accuracy of data mapping and reduces the chances of human error.
  5. Creating Elasticsearch Index: We utilized the Elasticsearch client and the mapping generated in the previous step to create an Elasticsearch index. The created index replicates the structure defined by our Pydantic model.
  6. Data Validation and Indexing: Once the index was created, we demonstrated how to validate and index data using our Pydantic model. We made use of Pydantic's parse_obj method, which not only validates the data but also converts it into a model instance. Any errors encountered during validation are raised immediately, thus ensuring data quality and integrity.

This method serves as a guideline for using Pydantic and Elasticsearch together. It streamlines the process of data validation and indexing while minimizing the risk of data discrepancies and errors. This combination is particularly useful in data-intensive applications where the quality and structure of data play a crucial role.

Conclusion

Pydantic and Elasticsearch couple presents a powerful solution for data management, and by leveraging the functionality of Pydantic, we can implement automatic validation, serialization, and documentation for complex data schemas, ensuring our data consistency and structure.

Elasticsearch, on the other hand, is a potent, flexible, and scalable search and analytics engine that can handle a considerable volume of data swiftly and efficiently; however, creating an index with the correct mappings can be a complicated process, especially for nested and complex structures.

The method illustrated in this article demonstrates how Pydantic can be used to simplify this process. By defining a Pydantic model that mirrors our desired Elasticsearch index structure, we can automate the generation of Elasticsearch index mappings, thereby reducing human error and enhancing the accuracy of data mapping. Furthermore, by utilizing Pydantic's validation capabilities, we ensure the quality and integrity of the data that we index into Elasticsearch.

Though our focus has been on a specific application involving Book data, the method is widely applicable to any scenario where you are working with structured data and Elasticsearch. With this approach, the benefits of data consistency, efficient search, and automated validation are realized, reinforcing the value of integrating these two powerful tools.

Data management Elasticsearch Data (computing) Data mapping

Opinions expressed by DZone contributors are their own.

Related

  • OpenSearch: Introduction and Data Management Patterns
  • Building a Cost-Effective ELK Stack for Centralized Logging
  • Achieving Security and Trust in a Data Fabric: The Role of Zero Trust Architecture
  • Data Architectures in the AI Era: Key Strategies and Insights

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!