DZone
Big Data Zone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
  • Refcardz
  • Trend Reports
  • Webinars
  • Zones
  • |
    • Agile
    • AI
    • Big Data
    • Cloud
    • Database
    • DevOps
    • Integration
    • IoT
    • Java
    • Microservices
    • Open Source
    • Performance
    • Security
    • Web Dev
DZone > Big Data Zone > How Milvus Deletes Streaming Data in a Distributed Cluster

How Milvus Deletes Streaming Data in a Distributed Cluster

The cardinal design behind the deletion function in Milvus 2.0, the world's most advanced vector database.

Jun Gu user avatar by
Jun Gu
CORE ·
Feb. 27, 22 · Big Data Zone · Analysis
Like (2)
Save
Tweet
4.71K Views

Join the DZone community and get the full member experience.

Join For Free

Featuring unified batch-and-stream processing and cloud-native architecture, Milvus 2.0 poses a greater challenge than its predecessor did during the development of the DELETE function. Thanks to its advanced storage-computation disaggregation design and the flexible publication/subscription mechanism, we are proud to announce that we made it happen. In Milvus 2.0, you can delete an entity in a given collection with its primary key so that the deleted entity will no longer be listed in the result of a search or a query.

Please note that the DELETE operation in Milvus refers to logical deletion, whereas physical data cleanup occurs during the Data Compaction. Logical deletion not only greatly boosts the search performance constrained by the I/O speed, but also facilitates data recovery. Logically deleted data can still be retrieved with the help of the Time Travel function.

Usage

Let's try out the DELETE function in Milvus 2.0 first. (The following example uses PyMilvus 2.0.0 on Milvus 2.0.0).

Python
 
from pymilvus import connections, utility, Collection, DataType, FieldSchema, CollectionSchema
# Connect to Milvus
connections.connect(
    alias="default", 
    host='x.x.x.x', 
    port='19530'
)
# Create a collection with Strong Consistency level
pk_field = FieldSchema(
    name="id", 
    dtype=DataType.INT64, 
    is_primary=True, 
)
vector_field = FieldSchema(
    name="vector", 
    dtype=DataType.FLOAT_VECTOR, 
    dim=2
)
schema = CollectionSchema(
    fields=[pk_field, vector_field], 
    description="Test delete"
)
collection_name = "test_delete"
collection = Collection(
    name=collection_name, 
    schema=schema, 
    using='default', 
    shards_num=2,
    consistency_level="Strong"
)
# Insert randomly generated vectors
import random
data = [
    [i for i in range(100)],
    [[random.random() for _ in range(2)] for _ in range(100)],
]
collection.insert(data)
# Query to make sure the entities to delete exist
collection.load()
expr = "id in [2,4,6,8,10]"
pre_del_res = collection.query(
    expr,
    output_fields = ["id", "vector"]
)
print(pre_del_res)
# Delete the entities with the previous expression
collection.delete(expr)
# Query again to check if the deleted entities exist
post_del_res = collection.query(
    expr,
    output_fields = ["id", "vector"]
)
print(post_del_res)


Implementation

In a Milvus instance, a data node is mainly responsible for packing streaming data (logs in log broker) as historical data (log snapshots) and automatically flushing them to object storage. A query node executes search requests on full data, i.e. both streaming data and historical data.

To make the most of the data writing capacity of parallel nodes in a cluster, Milvus adopts a sharding strategy based on primary key hashing to distribute writing operations evenly to different worker nodes. That is to say, proxy will route the Data Manipulation Language (DML) messages (i.e. requests) of an entity to the same data node and query node. These messages are published through the DML-Channel and consumed by the data node and query node separately to provide search and query services together.

Data Node

Having received data INSERT messages, the data node inserts the data in a growing segment, which is a new segment created to receive streaming data in memory. If either the data row count or the duration of the growing segment reaches the threshold, the data node seals it to prevent any incoming data. The data node then flushes the sealed segment, which contains the historical data, to the object storage. Meanwhile, the data node generates a bloom filter based on the primary keys of the new data, and flushed it to the object storage together with the sealed segment, saving the bloom filter as a part of the statistics binary log (binlog), which contains the statistical information of the segment.

A bloom filter is a probabilistic data structure that consists of a long binary vector and a series of random mapping functions. It can be used to test whether an element is a member of a set, but might return false positive matches. —— Wikipedia

When data DELETE messages come in, data node buffers all bloom filters in the corresponding shard, and matches them with the primary keys provided in the messages to retrieve all segments (from both growing and sealed ones) that possibly include the entities to delete. Having pinpointed the corresponding segments, data node buffers them in memory to generate the Delta binlogs to record the delete operations, and then flushes those binlogs together with the segments back to the object storage.

Data node delete workflow

DELETE workflow in data node


Since one shard is only assigned with one DML-Channel, extra query nodes added to the cluster will not be able to subscribe to the DML-Channel. To ensure that all query nodes can receive the DELETE messages, data nodes filter the DELETE messages from the DML-Channel, and forward them to Delta-Channel to notify all query nodes of the delete operations.

Query node

When loading a collection from object storage, the query node first obtains each shard's checkpoint, which marks the DML operations since the last flush operation. Based on the checkpoint, the query node loads all sealed segments together with their Delta binlog and bloom filters. With all data loaded, the query node then subscribes to DML-Channel, Delta-Channel, and Query-Channel.

If more data INSERT messages come after the collection is loaded to memory, the query node first pinpoints the growing segments according to the messages, and updates corresponding bloom filters in memory for query purposes only. Those query-dedicated bloom filters will not be flushed to object storage after the query is finished.

DELETE workflow in query node


DELETE workflow in query node


Query nodes that cannot subscribe to the DML-Channel are only allowed to process search or query requests on sealed segments because they can only subscribe to the Delta-Channel, and receive the DELETE messages forwarded by data nodes. Having collected all DELETE messages in the sealed segments from Delta-Channel, the query nodes locate the entities by matching the provided primary keys with the bloom filters of the sealed segments, and then record the delete operations in the corresponding segments.

Eventually, in a search or query, the query nodes generate a bitset based on the delete records to omit the deleted entities, and search among the remaining entities from all segments, regardless of the segment status. Last but not least, the consistency level affects the visibility of the deleted data. Under Strong Consistency Level (as shown in the previous code sample), the deleted entities are immediately invisible after deletion. While Bounded Consistency Level is adopted, there will be several seconds of latency before the deleted entities become invisible.

In the upcoming articles of this series of blogs, we will introduce the design of Data Compaction, Dynamic Load Balance, and Bitset in Milvus 2.0. Please stay tuned.

Data (computing) Database cluster Relational database Filter (software) Data manipulation language

Published at DZone with permission of Jun Gu. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Correlation Between Fast TTM and Containers
  • No-Code/Low-Code Use Cases in the Enterprise
  • Getting Started With RSocket Kotlin
  • Regression Testing: Significance, Challenges, Best Practices and Tools

Comments

Big Data Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • MVB Program
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends:

DZone.com is powered by 

AnswerHub logo