Modern Digital Website Security: Prepare to face any form of malicious web activity and enable your sites to optimally serve your customers.
Low-Code Development: Learn the concepts of low code, features + use cases for professional devs, and the low-code implementation process.
Welcome to the Data Engineering category of DZone, where you will find all the information you need for AI/ML, big data, data, databases, and IoT. As you determine the first steps for new systems or reevaluate existing ones, you're going to require tools and resources to gather, store, and analyze data. The Zones within our Data Engineering category contain resources that will help you expertly navigate through the SDLC Analysis stage.
Artificial intelligence (AI) and machine learning (ML) are two fields that work together to create computer systems capable of perception, recognition, decision-making, and translation. Separately, AI is the ability for a computer system to mimic human intelligence through math and logic, and ML builds off AI by developing methods that "learn" through experience and do not require instruction. In the AI/ML Zone, you'll find resources ranging from tutorials to use cases that will help you navigate this rapidly growing field.
Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Data is at the core of software development. Think of it as information stored in anything from text documents and images to entire software programs, and these bits of information need to be processed, read, analyzed, stored, and transported throughout systems. In this Zone, you'll find resources covering the tools and strategies you need to handle data properly.
A database is a collection of structured data that is stored in a computer system, and it can be hosted on-premises or in the cloud. As databases are designed to enable easy access to data, our resources are compiled here for smooth browsing of everything you need to know from database management systems to database languages.
IoT, or the Internet of Things, is a technological field that makes it possible for users to connect devices and systems and exchange data over the internet. Through DZone's IoT resources, you'll learn about smart devices, sensors, networks, edge computing, and many other technologies — including those that are now part of the average person's daily life.
Enterprise AI
In recent years, artificial intelligence has become less of a buzzword and more of an adopted process across the enterprise. With that, there is a growing need to increase operational efficiency as customer demands arise. AI platforms have become increasingly more sophisticated, and there has become the need to establish guidelines and ownership. In DZone’s 2022 Enterprise AI Trend Report, we explore MLOps, explainability, and how to select the best AI platform for your business. We also share a tutorial on how to create a machine learning service using Spring Boot, and how to deploy AI with an event-driven platform. The goal of this Trend Report is to better inform the developer audience on practical tools and design paradigms, new technologies, and the overall operational impact of AI within the business. This is a technology space that's constantly shifting and evolving. As part of our December 2022 re-launch, we've added new articles pertaining to knowledge graphs, a solutions directory for popular AI tools, and more.
Demystifying Databases, Data Warehouses, Data Lakes, and Data Lake Houses
Distributed SQL Essentials
Apache Airflow is an open-source platform that allows you to programmatically author, schedule, and monitor workflows. It uses Python as its programming language and offers a flexible architecture suited for both small-scale and large-scale data processing. The platform supports the concept of Directed Acyclic Graphs to define workflows, making it easy to visualize complex data pipelines. One of the key features of Apache Airflow is its ability to schedule and trigger batch jobs, making it a popular choice for processing large volumes of data. It provides excellent support for integrating with various data processing technologies and frameworks such as Apache Hadoop and Apache Spark. By using Apache Airflow for batch processing, you can easily define and schedule your data processing tasks, ensuring that they are executed in the desired order and within the specified time constraints. Batch processing is a common approach in big data processing that involves the processing of data in large volumes, typically at regular time intervals. This approach is well-suited for scenarios where data can be collected over a period and processed together as a batch. Within the fintech sector, batch processing caters to a wide range of applications, including but not limited to authorization and settlement processes, management of recurring payments, enabling reconciliation operations, performing fraud detection and analytic tasks, adhering to regulatory mandates, and overseeing changes to customer relationship management systems. Let's explore a simple use case of processing an input file and writing back to the output file using Apache Airflow. To get started with Apache Airflow, you can follow the official documentation for installation and setup. Overview diagram illustrating the basic flow of a batch processing scenario Setting the Stage Python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta # Default arguments for the DAG default_args = { 'owner': 'airflow', 'start_date': datetime(2021, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5), } The script begins by importing necessary modules and defining default arguments for the DAG. These default parameters include the DAG owner, start date, and retry settings. Reading Function: Extracting Data Python def read_function(**kwargs): ti = kwargs["ti"] # Read from a file (example: input.txt) with open("path/to/file/input_file.txt", "r") as file: # Read the remaining lines lines = file.readlines() # Push each line to XCom storage for i, line in enumerate(lines): ti.xcom_push(key=f"line_{i}", value=line.strip()) # Push the total number of lines to XCom storage ti.xcom_push(key="num_lines", value=len(lines)) The read_function simulates the extraction of data by reading lines from a file (`input.txt`). It then uses Airflow's XCom feature to push each line and the total number of lines into storage, making it accessible to subsequent tasks. Sample Input File Plain Text CardNumber,TransactionId,Amount,TxnType,Recurring,Date 1,123456789,100.00,Debit,Monthly,2023-12-31 2,987654321,50.00,Credit,Weekly,2023-10-15 3,456789012,75.50,Debit,Monthly,2023-11-30 4,555111222,120.75,Credit,Daily,2023-09-30 In the given input file, we can see the handling of a recurring transactions file. Processing Function: Transforming Data Python def process_function(**kwargs): ti = kwargs["ti"] # Pull all lines from XCom storage lines = [ti.xcom_pull(task_ids="read", key=f"line_{i}") for i in range(ti.xcom_pull(task_ids="read", key="num_lines"))] # Process and print all lines for i, line in enumerate(lines): logging.info(f"Make Payment Transaction {i + 1}: {line}") The process_function pulls all lines from XCom storage and simulates the transformation process by printing each line to the console. This task demonstrates the flexibility of Airflow in handling data flow between tasks. The process_function can have multiple implementations, allowing it to either invoke a web service call to execute the transaction or call another DAG to follow a different flow. Logs Plain Text [2023-11-28, 03:49:06 UTC] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='batch_processing_dag' AIRFLOW_CTX_TASK_ID='process' AIRFLOW_CTX_EXECUTION_DATE='2023-11-28T03:48:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-11-28T03:48:00+00:00' [2023-11-28, 03:49:06 UTC] {batch_processing_dag.py:38} INFO - Make Payment Transaction 1: 1,123456789,100.00,Debit,Monthly,2023-12-31 [2023-11-28, 03:49:06 UTC] {batch_processing_dag.py:38} INFO - Make Payment Transaction 2: 2,987654321,50.00,Credit,Weekly,2023-10-15 [2023-11-28, 03:49:06 UTC] {batch_processing_dag.py:38} INFO - Make Payment Transaction 3: 3,456789012,75.50,Debit,Monthly,2023-11-30 [2023-11-28, 03:49:06 UTC] {batch_processing_dag.py:38} INFO - Make Payment Transaction 4: 4,555111222,120.75,Credit,Daily,2023-09-30 [2023-11-28, 03:49:06 UTC] {python.py:194} INFO - Done. Returned value was: None Writing Function: Loading Data Python def write_function(**kwargs): ti = kwargs["ti"] # Pull all lines from XCom storage lines = [ti.xcom_pull(task_ids="read", key=f"line_{i}") for i in range(ti.xcom_pull(task_ids="read", key="num_lines"))] # Write all lines to an output file (example: output.txt) with open("path/to/file/processed.txt", "a") as file: for i, line in enumerate(lines): processed_line = f"{line.strip()} PROCESSED" file.write(f"{processed_line}\n") The write_function pulls all lines from XCom storage and writes them to an output file (`processed.txt`). Sample Output File After Transaction Is Processed Plain Text 1,123456789,100.00,Debit,Monthly,2023-12-31 PROCESSED 2,987654321,50.00,Credit,Weekly,2023-10-15 PROCESSED 3,456789012,75.50,Debit,Monthly,2023-11-30 PROCESSED 4,555111222,120.75,Credit,Daily,2023-09-30 PROCESSED DAG Definition: Orchestrating the Workflow Python dag = DAG( 'batch_processing_dag', default_args=default_args, description='DAG with Read, Process, and Write functions', schedule_interval='*/1 * * * *', # Set the schedule interval according to your needs catchup=False, ) The DAG is instantiated with the name batch_processing_dag, the previously defined default arguments, a description, a schedule interval (running every 1 minute), and the catchup parameter set to False. Task Definitions: Executing the Functions Python # Task to read from a file and push to XCom storage read_task = PythonOperator( task_id='read', python_callable=read_function, provide_context=True, dag=dag, ) # Task to process the data from XCom storage (print to console) process_task = PythonOperator( task_id='process', python_callable=process_function, provide_context=True, dag=dag, ) # Task to write the data back to an output file write_task = PythonOperator( task_id='write', python_callable=write_function, provide_context=True, dag=dag, ) Three tasks (read_task, process_task, and write_task) are defined using the PythonOperator. Each task is associated with one of the Python functions (read_function, process_function, and write_function). The provide_context=True parameter allows the functions to access the task instance and context information. Defining Task Dependencies Python # Define task dependencies read_task >> process_task >> write_task The task dependencies are specified using the >> operator, indicating the order in which the tasks should be executed. Conclusion In conclusion, Apache Airflow proves to be a flexible open-source tool that is great at managing workflows, especially when it comes to batch processing. It is the best choice for organizations of all sizes because it has features like dynamic workflow definition, support for Directed Acyclic Graphs (DAGs), careful task dependency management, full monitoring and logging, efficient parallel execution, and strong error handling. Illustrated by a straightforward batch processing scenario, the example emphasizes Apache Airflow's user-friendly interface and its adaptability to a range of data processing needs, showcasing its ease of use and versatility.
The AIDocumentLibraryChat project uses the Spring AI project with OpenAI to search in a document library for answers to questions. To do that, Retrieval Augmented Generation is used on the documents. Retrieval Augmented Generation The process looks like this: The process looks like this: Upload Document Store Document in Postgresql DB. Split Document to create Embeddings. Create Embeddings with a call to the OpenAI Embedding Model. Store the Document Embeddings in the Postgresql Vector DB. Search Documents: Create Search Prompt Create Embedding of the Search Prompt with a call to the OpenAI Embedding Model. Query the Postgresql Vector DB for documents with nearest Embedding distances. Query Postgresql DB for Document. Create Prompt with the Search Prompt and the Document text chunk. Request an answer from GPT Model and show the answer based on the search prompt and the Document text chunk. Document Upload The uploaded document is stored in the database to have the source document of the answer. The document text has to be split in chunks to create embeddings per chunk. The embeddings are created by an embedding model of OpenAI and are a vectors with more than 1500 dimensions to represent the text chunk. The embedding is stored in an AI document with the chunk text and the id of the source document in the vector database. Document Search The document search takes the search prompt and uses the Open AI embedding model to turn it in an embedding. The embedding is used to search in the vector database for the nearest neighbor vector. That means that the embeddings of search prompt and the text chunk that have the biggest similarities. The id in the AIDocument is used to read the document of the relational database. With the Search Prompt and the text chunk of the AIDocument, the Document Prompt created. Then, the OpenAI GPT model is called with the prompt to create an answer based on Search Prompt and the document context. That causes the model to create answers that are closely based on the documents provided and improves the accuracy. The answer of the GPT model is returned and displayed with a link of the document to provide the source of the answer. Architecture The architecture of the project is built around Spring Boot with Spring AI. The Angular UI provides the user interface to show the document list, upload the documents and provide the Search Prompt with the answer and the source document. It communicates with the Spring Boot backend via the rest interface. The Spring Boot backend provides the rest controllers for the frontend and uses Spring AI to communicate with the OpenAI models and the Postgresql Vector database. The documents are stored with Jpa in the Postgresql Relational database. The Postgresql database is used because it combines the relational database and the vector database in a Docker image. Implementation Frontend The frontend is based on lazy loaded standalone components build with Angular. The lazy loaded standalone components are configured in the app.config.ts: TypeScript export const appConfig: ApplicationConfig = { providers: [provideRouter(routes), provideAnimations(), provideHttpClient()] }; The configuration sets the routes and enables the the http client and the animations. The lazy loaded routes are defined in app.routes.ts: TypeScript export const routes: Routes = [ { path: "doclist", loadChildren: () => import("./doc-list").then((mod) => mod.DOCLIST), }, { path: "docsearch", loadChildren: () => import("./doc-search").then((mod) => mod.DOCSEARCH), }, { path: "**", redirectTo: "doclist" }, ]; In 'loadChildren' the 'import("...").then((mod) => mod.XXX)' loads the the route lazily from the provided path and sets the exported routes defined in the 'mod.XXX' constant. The lazy loaded route 'docsearch' has the index.ts to export the constant: TypeScript export * from "./doc-search.routes"; That exports the doc-search.routes.ts: TypeScript export const DOCSEARCH: Routes = [ { path: "", component: DocSearchComponent, }, { path: "**", redirectTo: "" }, ]; It defines the routing to the 'DocSearchComponent'. The fileupload can be found in the DocImportComponent with the template doc-import.component.html: HTML <h1 mat-dialog-title i18n="@@docimportImportFile">Import file</h1> <div mat-dialog-content> <p i18n="@@docimportFileToImport">File to import</p> @if(uploading) { <div class="upload-spinner"><mat-spinner></mat-spinner></div> } @else { <input type="file" (change)="onFileInputChange($event)"> } @if(!!file) { <div> <ul> <li>Name: {{file.name}</li> <li>Type: {{file.type}</li> <li>Size: {{file.size} bytes</li> </ul> </div> } </div> <div mat-dialog-actions> <button mat-button (click)="cancel()" i18n="@@cancel">Cancel</button> <button mat-flat-button color="primary" [disabled]="!file || uploading" (click)="upload()" i18n="@@docimportUpload">Upload</button> </div> The fileupload is done with the '<input type="file" (change)="onFileInputChange($event)">' tag. It provides the upload feature and calls the 'onFileInputChange(...)' method after each upload. The 'Upload' button calls the 'upload()' method to send the file to the server on click. The doc-import.component.ts has methods for the template: TypeScript @Component({ selector: 'app-docimport', standalone: true, imports: [CommonModule,MatFormFieldModule, MatDialogModule,MatButtonModule, MatInputModule, FormsModule, MatProgressSpinnerModule], templateUrl: './doc-import.component.html', styleUrls: ['./doc-import.component.scss'] }) export class DocImportComponent { protected file: File | null = null; protected uploading = false; private destroyRef = inject(DestroyRef); constructor(private dialogRef: MatDialogRef<DocImportComponent>, @Inject(MAT_DIALOG_DATA) public data: DocImportComponent, private documentService: DocumentService) { } protected onFileInputChange($event: Event): void { const files = !$event.target ? null : ($event.target as HTMLInputElement).files; this.file = !!files && files.length > 0 ? files[0] : null; } protected upload(): void { if(!!this.file) { const formData = new FormData(); formData.append('file', this.file as Blob, this.file.name as string); this.documentService.postDocumentForm(formData) .pipe(tap(() => {this.uploading = true;}), takeUntilDestroyed(this.destroyRef)) .subscribe(result => {this.uploading = false; this.dialogRef.close();}); } } protected cancel(): void { this.dialogRef.close(); } } This is the standalone component with its module imports and the injected 'DestroyRef'. The 'onFileInputChange(...)' method takes the event parameter and stores its 'files' property in the 'files' constant. Then it checks for the first file and stores it in the 'file' component property. The 'upload()' method checks for the 'file' property and creates the 'FormData()' for the file upload. The 'formData' constant has the datatype ('file'), the content ('this.file') and the filename ('this.file.name') appended. Then the 'documentService' is used to post the 'FormData()' object to the server. The 'takeUntilDestroyed(this.destroyRef)' function unsubscribes the Rxjs pipeline after the component is destroyed. That makes unsubscribing pipelines very convenient in Angular. Backend The backend is a Spring Boot application with the Spring AI framework. Spring AI manages the requests to the OpenAI models and the Vector Database Requests. Liquibase Database setup The database setup is done with Liquibase and the script can be found in the db.changelog-1.xml: XML <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd"> <changeSet id="1" author="angular2guy"> <sql>CREATE EXTENSION if not exists hstore;</sql> </changeSet> <changeSet id="2" author="angular2guy"> <sql>CREATE EXTENSION if not exists vector;</sql> </changeSet> <changeSet id="3" author="angular2guy"> <sql>CREATE EXTENSION if not exists "uuid-ossp";</sql> </changeSet> <changeSet author="angular2guy" id="4"> <createTable tableName="document"> <column name="id" type="bigint"> <constraints primaryKey="true"/> </column> <column name="document_name" type="varchar(255)"> <constraints notNullConstraintName="document_document_name_notnull" nullable="false"/> </column> <column name="document_type" type="varchar(25)"> <constraints notNullConstraintName="document_document_type_notnull" nullable="false"/> </column> <column name="document_content" type="blob"/> </createTable> </changeSet> <changeSet author="angular2guy" id="5"> <createSequence sequenceName="document_seq" incrementBy="50" startValue="1000" /> </changeSet> <changeSet id="6" author="angular2guy"> <createTable tableName="vector_store"> <column name="id" type="uuid" defaultValueComputed="uuid_generate_v4 ()"> <constraints primaryKey="true"/> </column> <column name="content" type="text"/> <column name="metadata" type="json"/> <column name="embedding" type="vector(1536)"> <constraints notNullConstraintName= "vectorstore_embedding_type_notnull" nullable="false"/> </column> </createTable> </changeSet> <changeSet id="7" author="angular2guy"> <sql>CREATE INDEX vectorstore_embedding_index ON vector_store USING HNSW (embedding vector_cosine_ops);</sql> </changeSet> </databaseChangeLog> In the changeset 4 the table for the Jpa document entity is created with the primary key 'id'. The content type/size is unknown and because of that set to 'blob'. I changeset 5 the sequence for the Jpa entity is created with the default properties of Hibernate 6 sequences that are used by Spring Boot 3.x. In changeset 6 the table 'vector_store' is created with a primary key 'id' of type 'uuid' that is created by the 'uuid-ossp' extension. The column 'content' is of type 'text'('clob' in other databases) to has a flexible size. The 'metadata' column stores the metadata in a 'json' type for the AIDocuments. The 'embedding' column stores the embedding vector with the number of OpenAI dimensions. In changeset 7 the index for the fast search of the 'embeddings' column is set. Due to limited parameters of the Liquibase '<createIndex ...>' '<sql>' is used directly to create it. Spring Boot / Spring AI implementation The DocumentController for the frontend looks like this: Java @RestController @RequestMapping("rest/document") public class DocumentController { private final DocumentMapper documentMapper; private final DocumentService documentService; public DocumentController(DocumentMapper documentMapper, DocumentService documentService) { this.documentMapper = documentMapper; this.documentService = documentService; } @PostMapping("/upload") public long handleDocumentUpload( @RequestParam("file") MultipartFile document) { var docSize = this.documentService .storeDocument(this.documentMapper.toEntity(document)); return docSize; } @GetMapping("/list") public List<DocumentDto> getDocumentList() { return this.documentService.getDocumentList().stream() .flatMap(myDocument ->Stream.of(this.documentMapper.toDto(myDocument))) .flatMap(myDocument -> { myDocument.setDocumentContent(null); return Stream.of(myDocument); }).toList(); } @GetMapping("/doc/{id}") public ResponseEntity<DocumentDto> getDocument( @PathVariable("id") Long id) { return ResponseEntity.ofNullable(this.documentService .getDocumentById(id).stream().map(this.documentMapper::toDto) .findFirst().orElse(null)); } @GetMapping("/content/{id}") public ResponseEntity<byte[]> getDocumentContent( @PathVariable("id") Long id) { var resultOpt = this.documentService.getDocumentById(id).stream() .map(this.documentMapper::toDto).findFirst(); var result = resultOpt.stream().map(this::toResultEntity) .findFirst().orElse(ResponseEntity.notFound().build()); return result; } private ResponseEntity<byte[]> toResultEntity(DocumentDto documentDto) { var contentType = switch (documentDto.getDocumentType()) { case DocumentType.PDF -> MediaType.APPLICATION_PDF; case DocumentType.HTML -> MediaType.TEXT_HTML; case DocumentType.TEXT -> MediaType.TEXT_PLAIN; case DocumentType.XML -> MediaType.APPLICATION_XML; default -> MediaType.ALL; }; return ResponseEntity.ok().contentType(contentType) .body(documentDto.getDocumentContent()); } @PostMapping("/search") public DocumentSearchDto postDocumentSearch(@RequestBody SearchDto searchDto) { var result = this.documentMapper .toDto(this.documentService.queryDocuments(searchDto)); return result; } } The 'handleDocumentUpload(...)' handles the uploaded file with the 'documentService' at the '/rest/document/upload' path. The 'getDocumentList()' handles the get requests for the document lists and removes the document content to save on the response size. The 'getDocumentContent(...)' handles the get requests for the document content. It loads the document with the 'documentService' and maps the 'DocumentType' to the 'MediaType'. Then it returns the content and the content type, and the browser opens the content based on the content type. The 'postDocumentSearch(...)' method puts the request content in the 'SearchDto' object and returns the AI generated result of the 'documentService.queryDocuments(...)' call. The method 'storeDocument(...)' of the DocumentService looks like this: Java public Long storeDocument(Document document) { var myDocument = this.documentRepository.save(document); Resource resource = new ByteArrayResource(document.getDocumentContent()); var tikaDocuments = new TikaDocumentReader(resource).get(); record TikaDocumentAndContent(org.springframework.ai.document.Document document, String content) { } var aiDocuments = tikaDocuments.stream() .flatMap(myDocument1 -> this.splitStringToTokenLimit( myDocument1.getContent(), CHUNK_TOKEN_LIMIT) .stream().map(myStr -> new TikaDocumentAndContent(myDocument1, myStr))) .map(myTikaRecord -> new org.springframework.ai.document.Document( myTikaRecord.content(), myTikaRecord.document().getMetadata())) .peek(myDocument1 -> myDocument1.getMetadata() .put(ID, myDocument.getId().toString())).toList(); LOGGER.info("Name: {}, size: {}, chunks: {}", document.getDocumentName(), document.getDocumentContent().length, aiDocuments.size()); this.documentVsRepository.add(aiDocuments); return Optional.ofNullable(myDocument.getDocumentContent()).stream() .map(myContent -> Integer.valueOf(myContent.length).longValue()) .findFirst().orElse(0L); } private List<String> splitStringToTokenLimit(String documentStr, int tokenLimit) { List<String> splitStrings = new ArrayList<>(); var tokens = new StringTokenizer(documentStr).countTokens(); var chunks = Math.ceilDiv(tokens, tokenLimit); if (chunks == 0) { return splitStrings; } var chunkSize = Math.ceilDiv(documentStr.length(), chunks); var myDocumentStr = new String(documentStr); while (!myDocumentStr.isBlank()) { splitStrings.add(myDocumentStr.length() > chunkSize ? myDocumentStr.substring(0, chunkSize) : myDocumentStr); myDocumentStr = myDocumentStr.length() > chunkSize ? myDocumentStr.substring(chunkSize) : ""; } return splitStrings; } The 'storeDocument(...)' method saves the document to the relational database. Then, the document is converted in a 'ByteArrayResource' and read with the 'TikaDocumentReader' of Spring AI to turn it in a AIDocument list. Then the AIDocument list is flatmapped to split the documents into chunks with the the 'splitToTokenLimit(...)' method that are turned in new AIDocuments with the 'id' of the stored document in the Metadata map. The 'id' in the Metadata enables loading the matching document entity for the AIDocuments. Then the embeddings for the AIDocuments are created implicitly with calls to the 'documentVsRepository.add(...)' method that calls the OpenAI Embedding model and stores the AIDocuments with the embeddings in the vector database. Then the result is returned. The method 'queryDocument(...)' looks like this: Java public AiResult queryDocuments(SearchDto searchDto) { var similarDocuments = this.documentVsRepository .retrieve(searchDto.getSearchString()); var mostSimilar = similarDocuments.stream() .sorted((myDocA, myDocB) -> ((Float) myDocA.getMetadata().get(DISTANCE)) .compareTo(((Float) myDocB.getMetadata().get(DISTANCE)))).findFirst(); var documentChunks = mostSimilar.stream().flatMap(mySimilar -> similarDocuments.stream().filter(mySimilar1 -> mySimilar1.getMetadata().get(ID).equals( mySimilar.getMetadata().get(ID)))).toList(); Message systemMessage = switch (searchDto.getSearchType()) { case SearchDto.SearchType.DOCUMENT -> this.getSystemMessage( documentChunks, (documentChunks.size() <= 0 ? 2000 : Math.floorDiv(2000, documentChunks.size()))); case SearchDto.SearchType.PARAGRAPH -> this.getSystemMessage(mostSimilar.stream().toList(), 2000); }; UserMessage userMessage = new UserMessage(searchDto.getSearchString()); Prompt prompt = new Prompt(List.of(systemMessage, userMessage)); LocalDateTime start = LocalDateTime.now(); AiResponse response = aiClient.generate(prompt); LOGGER.info("AI response time: {}ms", ZonedDateTime.of(LocalDateTime.now(), ZoneId.systemDefault()).toInstant().toEpochMilli() - ZonedDateTime.of(start, ZoneId.systemDefault()).toInstant() .toEpochMilli()); var documents = mostSimilar.stream().map(myGen -> myGen.getMetadata().get(ID)).filter(myId -> Optional.ofNullable(myId).stream().allMatch(myId1 -> (myId1 instanceof String))).map(myId -> Long.parseLong(((String) myId))) .map(this.documentRepository::findById) .filter(Optional::isPresent) .map(Optional::get).toList(); return new AiResult(searchDto.getSearchString(), response.getGenerations(), documents); } private Message getSystemMessage( List<org.springframework.ai.document.Document> similarDocuments, int tokenLimit) { String documents = similarDocuments.stream() .map(entry -> entry.getContent()) .filter(myStr -> myStr != null && !myStr.isBlank()) .map(myStr -> this.cutStringToTokenLimit(myStr, tokenLimit)) .collect(Collectors.joining("\n")); SystemPromptTemplate systemPromptTemplate = new SystemPromptTemplate(this.systemPrompt); Message systemMessage = systemPromptTemplate .createMessage(Map.of("documents", documents)); return systemMessage; } private String cutStringToTokenLimit(String documentStr, int tokenLimit) { String cutString = new String(documentStr); while (tokenLimit < new StringTokenizer(cutString, " -.;,").countTokens()){ cutString = cutString.length() > 1000 ? cutString.substring(0, cutString.length() - 1000) : ""; } return cutString; } The method first loads the documents best matching the 'searchDto.getSearchString()' from the vector database. To do that the OpenAI Embedding model is called to turn the search string into an embedding and with that embedding the vector database is queried for the AIDocuments with the lowest distance(the distance between the vectors of the search embedding and the database embedding). Then the AIDocument with the lowest distance is stored in the 'mostSimilar' variable. Then all the AIDocuments of the document chunks are collected by matching the document entity id of their Metadata 'id's. The 'systemMessage' is created with the 'documentChunks' or the 'mostSimilar' content. The 'getSystemMessage(...)' method takes them and cuts the contentChunks to a size that the OpenAI GPT models can handle and returns the 'Message'. Then the 'systemMessage' and the 'userMessage' are turned into a 'prompt' that is send with 'aiClient.generate(prompt)' to the OpenAi GPT model. After that the AI answer is available and the document entity is loaded with the id of the metadata of the 'mostSimilar' AIDocument. The 'AiResult' is created with the search string, the GPT answer, the document entity and is returned. The vector database repository DocumentVsRepositoryBean with the Spring AI 'VectorStore' looks like this: Java @Repository public class DocumentVSRepositoryBean implements DocumentVsRepository { private final VectorStore vectorStore; public DocumentVSRepositoryBean(JdbcTemplate jdbcTemplate, EmbeddingClient embeddingClient) { this.vectorStore = new PgVectorStore(jdbcTemplate, embeddingClient); } public void add(List<Document> documents) { this.vectorStore.add(documents); } public List<Document> retrieve(String query, int k, double threshold) { return new VectorStoreRetriever(vectorStore, k, threshold).retrieve(query); } public List<Document> retrieve(String query) { return new VectorStoreRetriever(vectorStore).retrieve(query); } } The repository has the 'vectorStore' property that is used to access the vector database. It is created in the constructor with the injected parameters with the 'new PgVectorStore(...)' call. The PgVectorStore class is provided as the Postgresql Vector database extension. It has the 'embeddingClient' to use the OpenAI Embedding model and the 'jdbcTemplate' to access the database. The method 'add(...)' calls the OpenAI Embedding model and adds AIDocuments to the vector database. The methods 'retrieve(...)' query the vector database for embeddings with the lowest distances. Conclusion Angular made the creation of the front end easy. The standalone components with lazy loading have made the initial load small. The Angular Material components have helped a lot with the implementation and are easy to use. Spring Boot with Spring AI has made the use of Large Language Models easy. Spring AI provides the framework to hide the creation of embeddings and provides an easy-to-use interface to store the AIDocuments in a vector database(several are supported). The creation of the embedding for the search prompt to load the nearest AIDocuments is also done for you and the interface of the vector database is simple. The Spring AI prompt classes make the creation of the prompt for the OpenAI GPT models also easy. Calling the model is done with the injected 'aiClient,' and the results are returned. Spring AI is a very good Framework from the Spring Team. There have been no problems with the experimental version. With Spring AI, the Large Language Models are now easy to use on our own documents.
In this article, I will show you how to use Cloudera DataFlow powered by Apache NiFi to interact with IBM WatsonX.AI foundation large language models in real time. We can work with any of the foundation models such as Google FLAN T5 XXL or IBM Granite models. I’ll show you how easy it is to build a real-time data pipeline feeding your Slack-like and mobile applications questions directly to secure WatsonX.AI models running in IBM Cloud. We will handle all the security, management, lineage, and governance with Cloudera Data Flow. As part of decision-making, we can choose different WatsonX.AI models on the fly based on what type of prompt it is. For example, if we want to continue a sentence versus answering a question I can pick different models. For questions answering Google FLAN T5 XXL works well. If I want to continue sentences I would use one of the IBM Granite models. You will notice how amazingly fast the WatsonX.AI models return the results we need. I do some quick enrichment and transformation and then send them out their way to Cloudera Apache Kafka to be used for continuous analytics and distribution to many other applications, systems, platforms, and downstream consumers. We will also output our answers to the original requester which could be someone in a Slack channel or someone in an application. All of this happens in real-time, with no code, full governance, lineage, data management, and security at any scale and on any platform. The power of IBM and Cloudera together in private, public, and hybrid cloud environments for real-time data and AI is just getting started. Try it today. Step By Step Real-Time Flow First, in Slack, I type a question: “Q: What is a good way to integrate Generative AI and Apache NiFi?” NiFi Flow Top Once that question is typed, the Slack server sends these events to our registered service. This can be hosted anywhere publicly facing. (Click here for Slack API link) Slack API Once enabled, your server will start receiving JSON events for each Slack post. This is easy to receive and parse in NiFi. Cloudera DataFlow enables receiving secure HTTPS REST calls in the public cloud-hosted edition with ease, even in Designer mode. NiFi Top Flow 2 In the first part of the flow, we received the REST JSON Post, which is as follows. Slackbot 1.0 (+https://api.slack.com/robots) application/json POST HTTP/1.1 { "token" : "qHvJe59yetAp1bao6wmQzH0C", "team_id" : "T1SD6MZMF", "context_team_id" : "T1SD6MZMF", "context_enterprise_id" : null, "api_app_id" : "A04U64MN9HS", "event" : { "type" : "message", "subtype" : "bot_message", "text" : "==== NiFi to IBM <http://WatsonX.AI|WatsonX.AI> LLM Answers\n\nOn Date: Wed, 15 Nov 20 This is a very rich detailed JSON file that we could push immediately raw to an Apache Iceberg Open Cloud Lakehouse, a Kafka topic, or an object store as a JSON document (Enhancement Option). I am just going to parse what I need. EvaluateJSONPath We parse out the channel ID and plain text of the post. I only want messages from general (“C1SD6N197”). Then I copy the texts to an inputs field as is required for Hugging Face. We check our input: if it’s stocks or weather (more to come) we avoid calling the LLM. SELECT * FROM FLOWFILE WHERE upper(inputs) like '%WEATHER%' AND not upper(inputs) like '%LLM SKIPPED%' SELECT * FROM FLOWFILE WHERE upper(inputs) like '%STOCK%' AND not upper(inputs) like '%LLM SKIPPED%' SELECT * FROM FLOWFILE WHERE (upper(inputs) like 'QUESTION:%' OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%' and not upper(inputs) like '%STOCK%' For Stocks processing: To parse what stock we need I am using my Open NLP processor to get it. So you will need to download the processor and the Entity extraction models. GitHub - tspannhw/nifi-nlp-processor: Apache NiFi NLP Processor Open NLP Example Apache NiFi Processor Then we pass that company name to an HTTP REST endpoint from AlphaVantage that converts the Company Name to Stock symbols. In free accounts, you only get a few calls a day, so if we fail we then bypass this step and try to just use whatever you passed in. Using RouteOnContent we filter an Error message out. Then we use a QueryRecord processor to convert from CSV to JSON and filter. SELECT name as companyName, symbol FROM FLOWFILE ORDER BY matchScore DESC LIMIT 1 We do a SplitRecord to ensure we are only one record. We then run EvaluateJsonPath to get our fields as attributes. In an UpdateAttribute we trim the symbol just in case. ${stockSymbol:trim()} We then pass that stock symbol to Twelve Data via InvokeHTTP to get our stock data. We then get a lot of stock data back. { "meta" : { "symbol" : "IBM", "interval" : "1min", "currency" : "USD", "exchange_timezone" : "America/New_York", "exchange" : "NYSE", "mic_code" : "XNYS", "type" : "Common Stock" }, "values" : [ { "datetime" : "2023-11-15 10:37:00", "open" : "152.07001", "high" : "152.08000", "low" : "151.99500", "close" : "152.00999", "volume" : "8525" }, { "datetime" : "2023-11-15 10:36:00", "open" : "152.08501", "high" : "152.12250", "low" : "152.08000", "close" : "152.08501", "volume" : "15204" } ... We then run EvaluateJSONPath to grab the exchange information. We fork the record to just get one record as this is just to return to Slack. We use UpdateRecord calls to enrich the stock data with other values. We then run a QueryRecord to limit us to 1 record to send to Slack. SELECT * FROM FLOWFILE ORDER BY 'datetime' DESC LIMIT 1 We run an EvaluateJsonPath to get the most value fields to display. We then run a PutSlack with our message. LLM Skipped. Stock Value for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} is ${closeStockValue}. stock date ${stockdateTime}. stock exchange ${exchange} We also have a separate flow that is split from Company Name. In the first step, we call Yahoo Finance to get RSS headlines for that stock. https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}®ion=US&lang=en-US We use QueryRecord to convert RSS/XML Records to JSON. We then run a SplitJSON to break out the news items. We run a SplitRecord to limit to 1 record. We use EvaluateJSONPath to get the fields we need for our Slack message. We then run UpdateRecord to finalize our JSON. We then send this message to Slack. LLM Skipped. Stock News Information for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} ${title} : ${description}. ${guid} article date ${pubdate} For those who selected weather, we follow a similar route (we should add caching with Redis @ Aiven) to stocks. We use my OpenNLP processor to extract locations you might want to have weather on. The next step is taking the output of the processor and building a value to send to our Geoencoder. weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "New York City")} If we can’t find a valid location, I am going to say “New York City." We could use some other lookup. I am doing some work on loading all locations and could do some advanced PostgreSQL searches on that - or perhaps OpenSearch or a vectorized datastore. I pass that location to Open Meteo to find the geo via InvokeHTTP. https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=en&format=json We then parse the values we need from the results. { "results" : [ { "id" : 5128581, "name" : "New York", "latitude" : 40.71427, "longitude" : -74.00597, "elevation" : 10.0, "feature_code" : "PPL", "country_code" : "US", "admin1_id" : 5128638, "timezone" : "America/New_York", "population" : 8175133, "postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ], "country_id" : 6252001, "country" : "United States", "admin1" : "New York" } ], "generationtime_ms" : 0.92196465 } We then parse the results so we can call another API to get the current weather for that latitude and longitude via InvokeHTTP. https://api.weather.gov/points/${latitude:trim()},${longitude:trim()} The results are geo-json. { "@context": [ "https://geojson.org/geojson-ld/geojson-context.jsonld", { "@version": "1.1", "wx": "https://api.weather.gov/ontology#", "s": "https://schema.org/", "geo": "http://www.opengis.net/ont/geosparql#", "unit": "http://codes.wmo.int/common/unit/", "@vocab": "https://api.weather.gov/ontology#", "geometry": { "@id": "s:GeoCoordinates", "@type": "geo:wktLiteral" }, "city": "s:addressLocality", "state": "s:addressRegion", "distance": { "@id": "s:Distance", "@type": "s:QuantitativeValue" }, "bearing": { "@type": "s:QuantitativeValue" }, "value": { "@id": "s:value" }, "unitCode": { "@id": "s:unitCode", "@type": "@id" }, "forecastOffice": { "@type": "@id" }, "forecastGridData": { "@type": "@id" }, "publicZone": { "@type": "@id" }, "county": { "@type": "@id" } } ], "id": "https://api.weather.gov/points/40.7143,-74.006", "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.006, 40.714300000000001 ] }, "properties": { "@id": "https://api.weather.gov/points/40.7143,-74.006", "@type": "wx:Point", "cwa": "OKX", "forecastOffice": "https://api.weather.gov/offices/OKX", "gridId": "OKX", "gridX": 33, "gridY": 35, "forecast": "https://api.weather.gov/gridpoints/OKX/33,35/forecast", "forecastHourly": "https://api.weather.gov/gridpoints/OKX/33,35/forecast/hourly", "forecastGridData": "https://api.weather.gov/gridpoints/OKX/33,35", "observationStations": "https://api.weather.gov/gridpoints/OKX/33,35/stations", "relativeLocation": { "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.0279259, 40.745251000000003 ] }, "properties": { "city": "Hoboken", "state": "NJ", "distance": { "unitCode": "wmoUnit:m", "value": 3906.1522008034999 }, "bearing": { "unitCode": "wmoUnit:degree_(angle)", "value": 151 } } }, "forecastZone": "https://api.weather.gov/zones/forecast/NYZ072", "county": "https://api.weather.gov/zones/county/NYC061", "fireWeatherZone": "https://api.weather.gov/zones/fire/NYZ212", "timeZone": "America/New_York", "radarStation": "KDIX" } } We use EvaluateJSONPath to grab a forecast URL. Then we call that forecast URL via invokeHTTP. That produces a larger JSON output that we will parse for the results we want to return to Slack. { "@context": [ "https://geojson.org/geojson-ld/geojson-context.jsonld", { "@version": "1.1", "wx": "https://api.weather.gov/ontology#", "geo": "http://www.opengis.net/ont/geosparql#", "unit": "http://codes.wmo.int/common/unit/", "@vocab": "https://api.weather.gov/ontology#" } ], "type": "Feature", "geometry": { "type": "Polygon", "coordinates": [ [ [ -74.025095199999996, 40.727052399999998 ], [ -74.0295579, 40.705361699999997 ], [ -74.000948300000005, 40.701977499999998 ], [ -73.996479800000003, 40.723667899999995 ], [ -74.025095199999996, 40.727052399999998 ] ] ] }, "properties": { "updated": "2023-11-15T14:34:46+00:00", "units": "us", "forecastGenerator": "BaselineForecastGenerator", "generatedAt": "2023-11-15T15:11:39+00:00", "updateTime": "2023-11-15T14:34:46+00:00", "validTimes": "2023-11-15T08:00:00+00:00/P7DT17H", "elevation": { "unitCode": "wmoUnit:m", "value": 2.1335999999999999 }, "periods": [ { "number": 1, "name": "Today", "startTime": "2023-11-15T10:00:00-05:00", "endTime": "2023-11-15T18:00:00-05:00", "isDaytime": true, "temperature": 51, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 2.2222222222222223 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 68 }, "windSpeed": "1 to 7 mph", "windDirection": "SW", "icon": "https://api.weather.gov/icons/land/day/bkn?size=medium", "shortForecast": "Partly Sunny", "detailedForecast": "Partly sunny, with a high near 51. Southwest wind 1 to 7 mph." }, { "number": 2, "name": "Tonight", "startTime": "2023-11-15T18:00:00-05:00", "endTime": "2023-11-16T06:00:00-05:00", "isDaytime": false, "temperature": 44, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 3.8888888888888888 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 82 }, "windSpeed": "8 mph", "windDirection": "SW", "icon": "https://api.weather.gov/icons/land/night/sct?size=medium", "shortForecast": "Partly Cloudy", "detailedForecast": "Partly cloudy, with a low around 44. Southwest wind around 8 mph." }, { "number": 3, "name": "Thursday", "startTime": "2023-11-16T06:00:00-05:00", "endTime": "2023-11-16T18:00:00-05:00", "isDaytime": true, "temperature": 60, "temperatureUnit": "F", "temperatureTrend": "falling", "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 5.5555555555555554 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 82 }, "windSpeed": "6 mph", "windDirection": "SW", "icon": "https://api.weather.gov/icons/land/day/few?size=medium", "shortForecast": "Sunny", "detailedForecast": "Sunny. High near 60, with temperatures falling to around 58 in the afternoon. Southwest wind around 6 mph." }, { "number": 4, "name": "Thursday Night", "startTime": "2023-11-16T18:00:00-05:00", "endTime": "2023-11-17T06:00:00-05:00", "isDaytime": false, "temperature": 47, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 6.1111111111111107 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 80 }, "windSpeed": "3 mph", "windDirection": "SW", "icon": "https://api.weather.gov/icons/land/night/few?size=medium", "shortForecast": "Mostly Clear", "detailedForecast": "Mostly clear, with a low around 47. Southwest wind around 3 mph." }, { "number": 5, "name": "Friday", "startTime": "2023-11-17T06:00:00-05:00", "endTime": "2023-11-17T18:00:00-05:00", "isDaytime": true, "temperature": 63, "temperatureUnit": "F", "temperatureTrend": "falling", "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": 20 }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 12.222222222222221 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 86 }, "windSpeed": "2 to 10 mph", "windDirection": "S", "icon": "https://api.weather.gov/icons/land/day/bkn/rain,20?size=medium", "shortForecast": "Partly Sunny then Slight Chance Light Rain", "detailedForecast": "A slight chance of rain after 1pm. Partly sunny. High near 63, with temperatures falling to around 61 in the afternoon. South wind 2 to 10 mph. Chance of precipitation is 20%." }, { "number": 6, "name": "Friday Night", "startTime": "2023-11-17T18:00:00-05:00", "endTime": "2023-11-18T06:00:00-05:00", "isDaytime": false, "temperature": 51, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": 70 }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 12.777777777777779 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 100 }, "windSpeed": "6 to 10 mph", "windDirection": "SW", "icon": "https://api.weather.gov/icons/land/night/rain,60/rain,70?size=medium", "shortForecast": "Light Rain Likely", "detailedForecast": "Rain likely. Cloudy, with a low around 51. Chance of precipitation is 70%. New rainfall amounts between a quarter and half of an inch possible." }, { "number": 7, "name": "Saturday", "startTime": "2023-11-18T06:00:00-05:00", "endTime": "2023-11-18T18:00:00-05:00", "isDaytime": true, "temperature": 55, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": 70 }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 11.111111111111111 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 100 }, "windSpeed": "8 to 18 mph", "windDirection": "NW", "icon": "https://api.weather.gov/icons/land/day/rain,70/rain,30?size=medium", "shortForecast": "Light Rain Likely", "detailedForecast": "Rain likely before 1pm. Partly sunny, with a high near 55. Chance of precipitation is 70%." }, { "number": 8, "name": "Saturday Night", "startTime": "2023-11-18T18:00:00-05:00", "endTime": "2023-11-19T06:00:00-05:00", "isDaytime": false, "temperature": 40, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 1.1111111111111112 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 65 }, "windSpeed": "12 to 17 mph", "windDirection": "NW", "icon": "https://api.weather.gov/icons/land/night/few?size=medium", "shortForecast": "Mostly Clear", "detailedForecast": "Mostly clear, with a low around 40." }, { "number": 9, "name": "Sunday", "startTime": "2023-11-19T06:00:00-05:00", "endTime": "2023-11-19T18:00:00-05:00", "isDaytime": true, "temperature": 50, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": -0.55555555555555558 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 65 }, "windSpeed": "10 to 14 mph", "windDirection": "W", "icon": "https://api.weather.gov/icons/land/day/few?size=medium", "shortForecast": "Sunny", "detailedForecast": "Sunny, with a high near 50." }, { "number": 10, "name": "Sunday Night", "startTime": "2023-11-19T18:00:00-05:00", "endTime": "2023-11-20T06:00:00-05:00", "isDaytime": false, "temperature": 38, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": -0.55555555555555558 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 67 }, "windSpeed": "13 mph", "windDirection": "NW", "icon": "https://api.weather.gov/icons/land/night/few?size=medium", "shortForecast": "Mostly Clear", "detailedForecast": "Mostly clear, with a low around 38." }, { "number": 11, "name": "Monday", "startTime": "2023-11-20T06:00:00-05:00", "endTime": "2023-11-20T18:00:00-05:00", "isDaytime": true, "temperature": 46, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": -1.6666666666666667 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 70 }, "windSpeed": "13 mph", "windDirection": "NW", "icon": "https://api.weather.gov/icons/land/day/sct?size=medium", "shortForecast": "Mostly Sunny", "detailedForecast": "Mostly sunny, with a high near 46." }, { "number": 12, "name": "Monday Night", "startTime": "2023-11-20T18:00:00-05:00", "endTime": "2023-11-21T06:00:00-05:00", "isDaytime": false, "temperature": 38, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": -1.1111111111111112 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 70 }, "windSpeed": "10 mph", "windDirection": "N", "icon": "https://api.weather.gov/icons/land/night/sct?size=medium", "shortForecast": "Partly Cloudy", "detailedForecast": "Partly cloudy, with a low around 38." }, { "number": 13, "name": "Tuesday", "startTime": "2023-11-21T06:00:00-05:00", "endTime": "2023-11-21T18:00:00-05:00", "isDaytime": true, "temperature": 49, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": 30 }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 2.7777777777777777 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 73 }, "windSpeed": "9 to 13 mph", "windDirection": "E", "icon": "https://api.weather.gov/icons/land/day/bkn/rain,30?size=medium", "shortForecast": "Partly Sunny then Chance Light Rain", "detailedForecast": "A chance of rain after 1pm. Partly sunny, with a high near 49. Chance of precipitation is 30%." }, { "number": 14, "name": "Tuesday Night", "startTime": "2023-11-21T18:00:00-05:00", "endTime": "2023-11-22T06:00:00-05:00", "isDaytime": false, "temperature": 46, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": 50 }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 7.7777777777777777 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 86 }, "windSpeed": "13 to 18 mph", "windDirection": "S", "icon": "https://api.weather.gov/icons/land/night/rain,50?size=medium", "shortForecast": "Chance Light Rain", "detailedForecast": "A chance of rain. Mostly cloudy, with a low around 46. Chance of precipitation is 50%." } ] } } We parse the data with EvaluateJSONPath to get primary fields for the weather. We then format those fields to PutSlack. LLM Skipped. Read forecast on ${date} for ${weatherlocation} @ ${latitude},${longitude} Used ${forecasturl} ${icon} Temp: ${temperature} ${temperatureunit} - ${temperaturetrend} There is a wind ${winddirection} at ${windspeed}. ${detailedforecast} Slack Output If we do have an LLM question, let’s make sure it’s just one record. We use a few different models that are available at IBM WatsonX.AI on IBM Cloud to quickly be accessed by our REST prompts. I tested and built the prompts initially at IBM’s Prompt Lab and then copied the initial curl statement from there. Click here for supported foundation models available with IBM watsonx.ai. ibm/mpt-7b-instruct2meta-llama/llama-2–70b-chatibm/granite-13b-chat-v1 We have to send our unique secure key to IBM and they will give us a token to use in our next call. We parse out the question and then send it to WatsonX via REST API. We build a prompt to send to IBM as follows. { "model_id": "meta-llama/llama-2-70b-chat", "input": "${inputs:urlEncode()}", "parameters": { "decoding_method": "greedy", "max_new_tokens": 200, "min_new_tokens": 50, "stop_sequences": [], "repetition_penalty": 1 }, "project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" } We parse the generated text which is our Generative AI results plus some helpful metadata on timings. The result posted to Slack is as follows: “You can use Apache NiFi to integrate Generative AI models in several ways: Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi’s PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data. Real-time Inference: You can use NiFi’s StreamingJobs” After the Slackbot posted the results, it posted metrics and debugging information to the chat channel. All of the metadata is posted to another Slack channel for administrator monitoring. ==== NiFi to IBM WatsonX.AI LLM Answers On Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z Prompt: Q: What is a good way to integrate Generative AI and Apache NiFi? Response: ) You can use Apache NiFi to integrate Generative AI models in several ways: 1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering. 2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script. 3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data. 4. Real-time Inference: You can use NiFi's StreamingJobs Token: 200 Req Duration: 8153 HTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb IBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx IBM Msg ID: disclaimer_warning Model ID: meta-llama/llama-2-70b-chat Stop Reason: max_tokens Token Count: 38 TX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756 UUID: da0806cb-6133-4bf4-808e-1fbf419c09e3 Corr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756 Global TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b Service Time: 478 Request ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c File Name: 1a3c4386-86d2-4969-805b-37649c16addb Request Duration: 8153 Request URL: https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29 cf-ray: 82689bfd28e48ce2-EWR ===== Make Your Own Slackbot Slack Output Kafka Distribute Apache Flink SQL Table Creation DDL CREATE TABLE `ssb`.`Meetups`.`watsonairesults` ( `date` VARCHAR(2147483647), `x_global_transaction_id` VARCHAR(2147483647), `x_request_id` VARCHAR(2147483647), `cf_ray` VARCHAR(2147483647), `inputs` VARCHAR(2147483647), `created_at` VARCHAR(2147483647), `stop_reason` VARCHAR(2147483647), `x_correlation_id` VARCHAR(2147483647), `x_proxy_upstream_service_time` VARCHAR(2147483647), `message_id` VARCHAR(2147483647), `model_id` VARCHAR(2147483647), `invokehttp_request_duration` VARCHAR(2147483647), `message` VARCHAR(2147483647), `uuid` VARCHAR(2147483647), `generated_text` VARCHAR(2147483647), `transaction_id` VARCHAR(2147483647), `tokencount` VARCHAR(2147483647), `generated_token` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `advisoryId` VARCHAR(2147483647), `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND ) WITH ( 'deserialization.failure.policy' = 'ignore_and_log', 'properties.request.timeout.ms' = '120000', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = 'watsonxaillmanswers', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id' = 'watsonxaillmconsumer' ) CREATE TABLE `ssb`.`Meetups`.`watsonxresults` ( `date` VARCHAR(2147483647), `x_global_transaction_id` VARCHAR(2147483647), `x_request_id` VARCHAR(2147483647), `cf_ray` VARCHAR(2147483647), `inputs` VARCHAR(2147483647), `created_at` VARCHAR(2147483647), `stop_reason` VARCHAR(2147483647), `x_correlation_id` VARCHAR(2147483647), `x_proxy_upstream_service_time` VARCHAR(2147483647), `message_id` VARCHAR(2147483647), `model_id` VARCHAR(2147483647), `invokehttp_request_duration` VARCHAR(2147483647), `message` VARCHAR(2147483647), `uuid` VARCHAR(2147483647), `generated_text` VARCHAR(2147483647), `transaction_id` VARCHAR(2147483647), `tokencount` VARCHAR(2147483647), `generated_token` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND ) WITH ( 'deserialization.failure.policy' = 'ignore_and_log', 'properties.request.timeout.ms' = '120000', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = 'watsonxaillm', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id' = 'allwatsonx1' ) Example Prompt {"inputs":"Please answer to the following question. What is the capital of the United States?"} IBM DB2 SQL alter table "DB2INST1"."TRAVELADVISORY" add column "summary" VARCHAR(2048); -- DB2INST1.TRAVELADVISORY definition CREATE TABLE "DB2INST1"."TRAVELADVISORY" ( "TITLE" VARCHAR(250 OCTETS) , "PUBDATE" VARCHAR(250 OCTETS) , "LINK" VARCHAR(250 OCTETS) , "GUID" VARCHAR(250 OCTETS) , "ADVISORYID" VARCHAR(250 OCTETS) , "DOMAIN" VARCHAR(250 OCTETS) , "CATEGORY" VARCHAR(4096 OCTETS) , "DESCRIPTION" VARCHAR(4096 OCTETS) , "UUID" VARCHAR(250 OCTETS) NOT NULL , "TS" BIGINT NOT NULL , "summary" VARCHAR(2048 OCTETS) ) IN "IBMDB2SAMPLEREL" ORGANIZE BY ROW; ALTER TABLE "DB2INST1"."TRAVELADVISORY" ADD PRIMARY KEY ("UUID") ENFORCED; GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1"; GRANT CONTROL ON INDEX "SYSIBM "."SQL230620142604860" TO USER "DB2INST1"; SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE FROM DB2INST1.TRAVELADVISORY t WHERE "summary" IS NOT NULL ORDER BY ts DESC Example Output Email GitHub README GitHub repo Video Source Code Source Code
In the vast landscape of big data processing, Apache Spark stands out as a powerful and versatile framework. While developing Spark applications is crucial, deploying and executing them efficiently is equally vital. One key aspect of deploying Spark applications is the use of "spark-submit," a command-line interface that facilitates the submission of Spark applications to a cluster. Understanding Spark Submit At its core, spark-submit is the entry point for submitting Spark applications. Whether you are dealing with a standalone cluster, Apache Mesos, Hadoop YARN, or Kubernetes, spark-submit acts as the bridge between your developed Spark code and the cluster where it will be executed. Configuring Spark Submit Configuring spark-submit is a crucial aspect of deploying Apache Spark applications, allowing developers to optimize performance, allocate resources efficiently, and tailor the execution environment to specific requirements. Here's a guide on configuring spark-submit for various scenarios: 1. Specifying the Application JAR Use the --class option to specify the main class for a Java/Scala application or the script file for a Python/R application. spark-submit --class com.example.MainClass mysparkapp.jar 2. Setting Master and Deploy Mode Specify the Spark master URL using the --master option. Choose the deploy mode with --deploy-mode (client or cluster). spark-submit --master spark://<master-url> --deploy-mode client mysparkapp.jar 3. Configuring Executor and Driver Memory Allocate memory for executors using --executor-memory. Set driver memory using --driver-memory. spark-submit --executor-memory 4G --driver-memory 2G mysparkapp.jar 4. Adjusting Executor Cores Use --executor-cores to specify the number of cores for each executor. spark-submit --executor-cores 4 mysparkapp.jar 5. Dynamic Allocation Enable dynamic allocation to dynamically adjust the number of executors based on workload. spark-submit --conf spark.dynamicAllocation.enabled=true mysparkapp.jar 6. Setting Configuration Properties Pass additional Spark configurations using --conf. spark-submit --conf spark.shuffle.compress=true mysparkapp.jar 7. External Dependencies Include external JARs using --jars. For Python dependencies, use --py-files. spark-submit --jars /path/to/dependency.jar mysparkapp.jar 8. Cluster Manager Integration For YARN, set the YARN queue using --queue. For Kubernetes, use --master k8s://<k8s-apiserver>. spark-submit --master yarn --deploy-mode cluster --queue myQueue mysparkapp.jar 9. Debugging and Logging Increase logging verbosity for debugging with --verbose. Redirect logs to a file using --conf spark.logFile=spark.log. spark-submit --verbose --conf spark.logFile=spark.log mysparkapp.jar 10. Application Arguments Pass arguments to your application after specifying the JAR file. spark-submit mysparkapp.jar arg1 arg2 Conclusion In this article, we delve into the nuances of spark-submit to empower developers with the knowledge needed for effective Spark application deployment. By mastering this command-line interface, developers can unlock the true potential of Apache Spark, ensuring that their big data applications run efficiently and seamlessly across diverse clusters. Stay tuned as we explore each facet of spark-submit to elevate your Spark deployment skills.
This is an article from DZone's 2023 Observability and Application Performance Trend Report.For more: Read the Report AIOps applies AI to IT operations, enabling agility, early issue detection, and proactive resolution to maintain service quality. AIOps integrates DataOps and MLOps, enhancing efficiency, collaboration, and transparency. It aligns with DevOps for application lifecycle management and automation, optimizing decisions throughout DataOps, MLOps, and DevOps. Observability for IT operations is a transformative approach that provides real-time insights, proactive issue detection, and comprehensive performance analysis, ensuring the reliability and availability of modern IT systems. Why AIOps Is Fundamental to Modern IT Operations AIOps streamlines operations by automating problem detection and resolution, leading to increased IT staff efficiency, outage prevention, improved user experiences, and optimized utilization of cloud technologies. The major contributions of AIOps are shared in Table 1: CONTRIBUTIONS OF AIOPS Key Functions Function Explanations Event correlation Uses rules and logic to filter and group event data, prioritizing service issues based on KPIs and business metrics. Anomaly detection Identifies normal and abnormal behavior patterns, monitoring multiple services to predict and mitigate potential issues. Automated incident management Aims to automate all standardized, high-volume, error-sensitive, audit-critical, repetitive, multi-person, and time-sensitive tasks. Meanwhile, it preserves human involvement in low ROI and customer support-related activities. Performance optimization Analyzes large datasets employing AI and ML, proactively ensuring service levels and identifying issue root causes. Enhanced collaboration Fosters collaboration between IT teams, such as DevOps, by providing a unified platform for monitoring, analysis, and incident response. Table 1 How Does AIOps Work? AIOps involves the collection and analysis of vast volumes of data generated within IT environments, such as network performance metrics, application logs, and system alerts. AIOps uses these insights to detect patterns and anomalies, providing early warnings for potential issues. By integrating with other DevOps practices, such as DataOps and MLOps, it streamlines processes, enhances efficiency, and ensures a proactive approach to problem resolution. AIOps is a crucial tool for modern IT operations, offering the agility and intelligence required to maintain service quality in complex and dynamic digital environments. Figure 1: How AIOps works Popular AIOps Platforms and Key Features Leading AIOps platforms are revolutionizing IT operations by seamlessly combining AI and observability, enhancing system reliability, and optimizing performance across diverse industries. The following tools are just a few of many options: Prometheus acts as an efficient AIOps platform by capturing time-series data, monitoring IT environments, and providing anomaly alerts. OpenNMS automatically discovers, maps, and monitors complex IT environments, including networks, applications, and systems. Shinken enables users to monitor and troubleshoot complex IT environments, including networks and applications. The key features of the platforms and the role they play in AIOps are shared in Table 2: KEY FEATURES OF AIOPS PLATFORMS AND THE CORRESPONDING TASKS Features Tasks Visibility Provides insight into the entire IT environment, allowing for comprehensive monitoring and analysis. Monitoring and management Monitors the performance of IT systems and manages alerts and incidents. Performance Measures and analyzes system performance metrics to ensure optimal operation. Functionality Ensures that the AIOps platform offers a range of functionalities to meet various IT needs. Issue resolution Utilizes AI-driven insights to address and resolve IT issues more effectively. Analysis Analyzes data and events to identify patterns, anomalies, and trends, aiding in proactive decision-making. Table 2 Observability's Role in IT Operations Observability plays a pivotal role in IT operations by offering the means to monitor, analyze, and understand the intricacies of complex IT systems. It enables continuous tracking of system performance, early issue detection, and root cause analysis. Observability data empowers IT teams to optimize performance, allocate resources efficiently, and ensure a reliable user experience. It supports proactive incident management, compliance monitoring, and data-driven decision-making. In a collaborative DevOps environment, observability fosters transparency and enables teams to work cohesively toward system reliability and efficiency. Data sources like logs, metrics, and traces play a crucial role in observability by providing diverse and comprehensive insights into the behavior and performance of IT systems. ROLES OF DATA SOURCES Logs Metrics Traces Event tracking Root cause analysis Anomaly detection Compliance and auditing Performance monitoring Threshold alerts Capacity planning End-to-end visibility Latency analysis Dependency mapping Table 3 Challenges of Observability Observability is fraught with multiple technical challenges. Accidental invisibility takes place where critical system components or behaviors are not being monitored, leading to blind spots in observability. The challenge of insufficient source data can result in incomplete or inadequate observability, limiting the ability to gain insights into system performance. Dealing with multiple information formats poses difficulties in aggregating and analyzing data from various sources, making it harder to maintain a unified view of the system. Popular Observability Platforms and Key Features Observability platforms offer a set of key capabilities essential for monitoring, analyzing, and optimizing complex IT systems. OpenObserve provides scheduled and real-time alerts and reduces operational costs. Vector allows users to collect and transform logs, metrics, and traces. The Elastic Stack — comprising Elasticsearch, Kibana, Beats, and Logstash — can search, analyze, and visualize data in real time. The capabilities of observability platforms include real-time data collection from various sources such as logs, metrics, and traces, providing a comprehensive view of system behavior. They enable proactive issue detection, incident management, root cause analysis, system reliability aid, and performance optimization. Observability platforms often incorporate machine learning for anomaly detection and predictive analysis. They offer customizable dashboards and reporting for in-depth insights and data-driven decision-making. These platforms foster collaboration among IT teams by providing a unified space for developers and operations to work together, fostering a culture of transparency and accountability. Leveraging AIOps and Observability for Enhanced Performance Analytics Synergizing AIOps and observability represents a cutting-edge strategy to elevate performance analytics in IT operations, enabling data-driven insights, proactive issue resolution, and optimized system performance. Observability Use Cases Best Supported by AIOps Elevating cloud-native and hybrid cloud observability with AIOps: AIOps transcends the boundaries between cloud-native and hybrid cloud environments, offering comprehensive monitoring, anomaly detection, and seamless incident automation. It adapts to the dynamic nature of cloud-native systems while optimizing on-premises and hybrid cloud operations. This duality makes AIOps a versatile tool for modern enterprises, ensuring a consistent and data-driven approach to observability, regardless of the infrastructure's intricacies. Seamless collaboration of dev and ops teams with AIOps: AIOps facilitates the convergence of dev and ops teams in observability efforts. By offering a unified space for data analysis, real-time monitoring, and incident management, AIOps fosters transparency and collaboration. It enables dev and ops teams to work cohesively, ensuring the reliability and performance of IT systems. Challenges To Adopting AIOps and Observability The three major challenges to adopting AIOps and observability are data complexity, integration complexity, and data security. Handling the vast and diverse data generated by modern IT environments can be overwhelming. Organizations need to manage, store, and analyze this data efficiently. Integrating AIOps and observability tools with existing systems and processes can be complex and time-consuming, potentially causing disruptions if not executed properly. The increased visibility into IT systems also raises concerns about data security and privacy. Ensuring the protection of sensitive information is crucial. Impacts and Benefits of Combining AIOps and Observability Across Sectors The impacts and benefits of integrating AIOps and observability transcend industries, enhancing reliability, efficiency, and performance across diverse sectors. It helps in improved incident response by using machine learning to detect patterns and trends, enabling proactive issue resolution, and minimizing downtime. Predictive analytics anticipates capacity needs and optimizes resource allocation in advance, which ensures uninterrupted operations. Full-stack observability leverages data from various sources — including metrics, events, logs, and traces (MELT) — to gain comprehensive insights into system performance, supporting timely issue identification and resolution. MELT capabilities are the key drivers where metrics help pinpoint issues, events automate alert prioritization, logs aid in root cause analysis, and traces assist in locating problems within the system. All contribute to improved operational efficiency. APPLICATION SCENARIOS OF COMBINING AIOPS AND OBSERVABILITY Industry Sectors Key Contributions Finance Enhance fraud detection, minimize downtime, and ensure compliance with regulatory requirements, thus safeguarding financial operations. Healthcare Improve patient outcomes by guaranteeing the availability and performance of critical healthcare systems and applications, contributing to better patient care. Retail Optimize supply chain operations, boost customer experiences, and maintain online and in-store operational efficiency. Manufacturing Enhance the reliability and efficiency of manufacturing processes through predictive maintenance and performance optimization. Telecommunications Support network performance to ensure reliable connectivity and minimal service disruptions. E-commerce Real-time insights into website performance, leading to seamless shopping experiences and improved conversion rates. Table 4 The application scenarios of combining AIOps and observability span diverse industries, showcasing their transformative potential in improving system reliability, availability, and performance across the board. Operational Guidance for AIOps Implementation Operational guidance for AIOps implementation offers a strategic roadmap to navigate the complexities of integrating AI into IT operations, ensuring successful deployment and optimization. Figure 2: Steps for implementing AIOps The Future of AIOps in Observability: The Road Ahead AIOps' future in observability promises to be transformative. As IT environments become more complex and dynamic, AIOps will play an increasingly vital role in ensuring system reliability and performance and will continue to evolve, integrating with advanced technologies like cognitive automation, natural language understanding (NLU), large language models (LLMs), and generative AI. APPLICATION SCENARIOS OF COMBINING AIOPS AND OBSERVABILITY Impact Area Role of AIOps Synergy With Cognitive Automation LLM and Generative AI Integration Data collection and analysis Collects and analyzes a wide range of IT data, including performance metrics, logs, and incidents Process unstructured data, such as emails, documents, and images Predict potential issues based on historical data patterns and generate reports Incident management Automatically detects, prioritizes, and responds to IT incidents Extract relevant information from incident reports and suggest or implement appropriate actions Understand its context and generate appropriate responses Root cause analysis Identifies root causes of incidents Access historical documentation and knowledge bases to offer detailed explanations and solutions Provide recommendations by analyzing historical data for resolving issues NLU Uses NLU to process user queries and understand context Engage in natural language conversations with IT staff or end-users, improving user experiences Power chatbots and virtual IT assistants, offering user-friendly interaction and support to answer queries and provide guidance Table 5 Conclusion The fusion of AI/ML with AIOps has ushered in a new era of observability. IT operations are constantly evolving, and so is the capability to monitor, analyze, and optimize performance. In the age of AI/ML-driven observability, our IT operations won't merely survive, but will thrive, underpinned by data-driven insights, predictive analytics, and an unwavering commitment to excellence. References: OpenNMS repositories, GitHub OpenObserve repositories, GitHub OpsPAI/awesome-AIOps, GitHub Precompiled binaries and Docker images for Prometheus components Shinken documentation This is an article from DZone's 2023 Observability and Application Performance Trend Report.For more: Read the Report
Hello everyone! In this article, I want to share my knowledge and opinion about the data types that are often used as an identifier. Today we will touch on two topics at once. These are measurements of search speed by key and data types for the key on the database side. I will use a PostgreSQL database and a demo Java service to compare query speeds. UUID and ULID Why do we need some kind of incomprehensible types for IDs? I won’t talk about distributed systems, connectivity of services, sensitive data, and the like. If someone is interested in this, they can Google it - at the moment we are interested in performance. As the name suggests, we will talk about two types of keys: UUID and ULID. UUID has long been known to everyone, but ULID may be unfamiliar to some. The main advantage of ULID is that it is monotonically increasing and is a sortable type. Naturally, these are not all the differences. Personally, I also like the fact that there are no special characters in it. A small digression, I noticed a long time ago that many teams use the varchar(36) data type to store UUID in the PostgreSQL database and I don’t like this, since this database has a corresponding data type for UUID. A little later, we will see which type is preferable on the database side. Therefore, we will look not only at a comparison of the two data types on the backend side but also at the difference when storing UUID in different formats on the database side. Comparison So let's start comparing things. The UUID is 36 characters long and takes up 128 bits of memory. The ULID is 26 characters long and also takes up 128 bits of memory. For my examples, I created two tables in the database with three fields: SQL CREATE TABLE test.speed_ulid ( id varchar(26) PRIMARY KEY, name varchar(50), created timestamp ); CREATE TABLE test.speed_uuid ( id varchar(36) PRIMARY KEY, name varchar(50), created timestamp ); For the first comparison, I stored the UUID in varchar(36) format, as is often done. In the database, I recorded 1,000,000 in each of the tables. The test case will consist of 100 requests using identifiers previously pulled from the database; that is, when calling the test method, we will access the database 100 times and retrieve the entity by key. The connection will be created and warmed up before measurement. We will conduct two test runs and then 10 effective iterations. For your convenience, I will provide a link to the Java code at the end of the article. Sorry, but the measurements were taken on a standard MacBook Pro laptop and not on a dedicated server, but I don't believe there will be a significant difference in the results other than increased time spent on network traffic between the database and the backend. Here is some background information: # CPU I9-9980HK # CPU count: 16 # RAM: 32GB # JMH version: 1.37 # VM version: JDK 11.0.12, Java HotSpot(TM) 64-Bit Server VM, 11.0.12+8-LTS-237 # DB: PostgreSQL 13.4, build 1914, 64-bit Queries that will be used to obtain an entity by key: SQL SELECT * FROM test.speed_ulid where id = ? SELECT * FROM test.speed_uuid where id = ? Measurement Results Let's look at the measurement results. Let me remind you that each table has 1,000,000 rows. Both Types of Identifiers Are Stored in the Database as varchar I ran this test several times, and the result was about the same: either the ULID was a little faster, or the UUID. In percentage terms, the difference is practically zero. Well, you can disagree that there is no difference between these types. I would say that it is not possible to use other data types on the database side. UUID as uuid, ULID as varchar in DB For the next test, I changed the data type from varchar(36) to uuid in the test.speed_uuid table. In this case, the difference is obvious: 4.5% in favor of UUID. As you can see, it makes sense to use the uuid data type on the database side in the case of a type of the same name on the service side. The index for this format is very well optimized in PostgreSQL and shows good results. Well, now we can definitely part ways. Or not? If you look at the index search query plan, you can see the following ((id)::text = '01HEE5PD6HPWMBNF7ZZRF8CD9R'::text) in the case when we use varchar. In general, comparing two text variables is a rather slow operation, so maybe there is no need to store the ID in this format. Or are there other ways to speed up key comparison? First, let's create another index of the kind “hash” for the table with ULID. SQL create index speed_ulid_id_index on test.speed_ulid using hash (id); Let's look at the execution plan for our query: We will see that the database uses a hash index, and not a btree in this case. Let's run our test and see what happens. varchar + index(hash) for ULID, uuid for UUID This combination gave an increase of 2.3% relative to uuid and its cheating index. I'm not sure that keeping two indexes on one field can somehow be justified. So it's worth considering whether there's more you can do. And here it’s worth looking into the past and remembering how uuid or some other string identifiers used to be stored. That's right: either text or a byte array. So let's try this option: I removed all the indexes for the ULID, cast it to bytea , and recreated the primary key. bytea for ULID, uuid for UUID As a result, we got approximately the same result as in the previous run with an additional index, but I personally like this option better. Measurement result with 2,000,000 rows in the database: Measurement result with 3,000,000 rows in the database: I think there is no point in continuing measurements further. The pattern remains: ULID saved as bytea slightly outperforms UUID saved as uuid in DB. If we take the data from the first measurements, it is obvious that with the help of small manipulations, you can increase performance by about 9% if you use varchar. So, if you have read this far, I assume the article was interesting to you and you have already drawn some conclusions for yourself. It is worth noting that the measurements were made under ideal conditions for both the backend part and the database. We did not have any parallel processes running that write something to the database, change records, or perform complex calculations on the back-end side. Сonclusions Let's go over the material. What did you learn that was useful? Do not neglect the uuid data type on the PostgreSQL side. Perhaps someday extensions for ULID will appear in this database, but for now, we have what we have. Sometimes it is worth creating an additional index of the desired type manually, but there is an overhead to consider. If you are not afraid of unnecessary work - namely, writing your own converters for types - then you should try bytea if there is no corresponding type for your identifier on the database side. What type of data should be used for the primary key and in what format should it be stored? I don’t have a definite answer to these questions: it all depends on many factors. It is also worth noting that a competent choice of data type for ID, and not only for it, can at some point play an important role in your project. I hope this article was useful to you. Good luck! Project on GitHub
In contemporary web development, a recurring challenge revolves around harmonizing the convenience and simplicity of using a database with a web application. My name is Viacheslav Aksenov, and in this article, I aim to explore several of the most popular approaches for integrating databases and web applications within the Kubernetes ecosystem. These examples are examined within the context of a testing environment, where constraints are more relaxed. However, these practices can serve as a foundation applicable to production environments as well. One Service, One Database. Why? Running a database alongside a microservice aligns with the principles outlined in the Twelve-Factor App methodology. One key factor is "Backing Services" (Factor III), which suggests treating databases, message queues, and other services as attached resources to be attached or detached seamlessly. By co-locating the database with the microservice, we adhere to the principle of having a single codebase that includes the application and its dependencies, making it easier to manage, scale, and deploy. Additionally, it promotes encapsulation and modularity, allowing the microservice to be self-contained and portable across different environments, following the principles of the Twelve-Factor App. This approach enhances the maintainability and scalability of the entire application architecture. For this task, you can leverage various tools, and one example is using KubeDB. What Is KubeDB? KubeDB is an open-source project that provides a database management framework for Kubernetes, an open-source container orchestration platform. KubeDB simplifies the deployment, management, and scaling of various database systems within Kubernetes clusters. We used the following benefits from using this tool: Database operators: Postgres operator to simplify the process of deploying and managing database instances on Kubernetes. Monitoring and alerts: KubeDB integrates with monitoring and alerting tools like Prometheus and Grafana, enabling you to keep an eye on the health and performance of your database instances. Security: KubeDB helps you set up secure access to your databases using authentication mechanisms and secrets management. And it is very easy to set up the deployment. deployment.yaml: YAML apiVersion: kubedb.com/v1alpha2 kind: PostgreSQL metadata: name: your-postgresql spec: version: "11" storageType: Durable storage: storageClassName: <YOUR_STORAGE_CLASS> accessModes: - ReadWriteOnce resources: requests: storage: 1Gi terminationPolicy: WipeOut databaseSecret: secretName: your-postgresql-secret databaseURLFromSecret: true replicas: 1 users: - name: <YOUR_DB_USER> passwordSecret: secretName: your-postgresql-secret passwordKey: password databaseName: <YOUR_DB_NAME> Then, you can use the credentials and properties of this database to connect your service's pod to it with deployment.yaml: YAML apiVersion: apps/v1 kind: Deployment metadata: name: your-microservice spec: replicas: 1 selector: matchLabels: app: your-microservice template: metadata: labels: app: your-microservice spec: containers: - name: your-microservice-container image: your-microservice-image:tag ports: - containerPort: 80 env: - name: DATABASE_URL value: "postgres://<YOUR_DB_USER>:<YOUR_DB_PASSWORD>@<YOUR_DB_HOST>:<YOUR_DB_PORT>/<YOUR_DB_NAME>" --- apiVersion: v1 kind: Service metadata: name: your-microservice-service spec: selector: app: your-microservice ports: - protocol: TCP port: 80 targetPort: 80 And if, for some reason, you are not ready to use KubeDB or don't require the full functional of their product, you can use the Postgresql container as a sidecar for your test environment. Postgres Container as a Sidecar In the context of Kubernetes and databases like PostgreSQL, a sidecar is a separate container that runs alongside the main application container within a pod. The sidecar pattern is commonly used to enhance or extend the functionality of the main application container without directly impacting its core logic. Let's see the example of a configuration for a small Spring Boot Kotlin service that handles cat names. deployment.yaml: YAML apiVersion: apps/v1 kind: Deployment metadata: name: cat-svc labels: app: cat-svc spec: replicas: 1 selector: matchLabels: app: cat-svc template: metadata: labels: app: cat-svc type: http spec: containers: - name: cat-svc image: cat-svc:0.0.1 ports: - name: http containerPort: 8080 protocol: TCP readinessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 30 timeoutSeconds: 10 periodSeconds: 10 livenessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 60 timeoutSeconds: 10 periodSeconds: 30 env: - name: PLACES_DATABASE value: localhost:5432/cats - name: POSTGRES_USER value: pwd - name: POSTGRES_PASSWORD value: postgres - name: cat-postgres image: postgres:11.1 ports: - name: http containerPort: 5432 protocol: TCP env: - name: POSTGRES_USER value: pwd - name: POSTGRES_PASSWORD value: postgres - name: POSTGRES_DB value: cats Dockerfile FROM gradle:8.3.0-jdk17 COPY . . EXPOSE 8080 CMD ["gradle", "bootRun"] And for local run, it is possible to use docker-compose with the following configuration. docker-compose.yaml: YAML version: '3.8' services: cat-postgres: image: postgres:12.13 restart: always ports: - "5432:5432" environment: POSTGRES_PASSWORD: postgres POSTGRES_USER: postgres POSTGRES_DB: cats # volumes: # - ./init.sql:/docker-entrypoint-initdb.d/create_tables.sql - if you want to run any script before an app # - ./db-data/:/var/lib/postgresql/data/ service: image: cat-svc:0.0.1 restart: always ports: - '8080:8080' environment: SPRING_PROFILES_ACTIVE: prod PLACES_DATABASE: cat-postgres:5432/cats POSTGRES_PASSWORD: postgres POSTGRES_USER: postgres Migrations The big thing that has to be decided before using this approach is the migration question. The best option in this approach is to delegate the migration process to any tool that can work within your app infrastructure. For example, for Java World, you could use Flyway or Liquibase. Flyway is a popular open-source database migration tool. It allows you to version control your database schema and apply changes in a structured manner. Flyway supports multiple databases, including PostgreSQL, MySQL, and Oracle. Liquibase is an open-source database migration tool that supports tracking, managing, and applying database changes. It provides a way to define database changes using XML, YAML, or SQL, and it supports various databases. Pros of Using a PostgreSQL Sidecar in Kubernetes Separation of concerns: Sidecars allow you to separate specific functionalities (e.g., database migrations, backups) from the main application logic. Сompliance with microservice architecture. Simplified deployment: Sidecars can be deployed and managed alongside the main application using the same deployment configurations, simplifying the overall deployment process. You don't need to support separated database for testing the environment. And it leads to decreasing the complexity of tests (you don't need to think about collisions while you are running many CI with tests for the same table) Cons of Using a PostgreSQL Sidecar in Kubernetes Resource overhead: Running additional containers consumes resources (CPU, memory) on the node, which may impact the overall performance and resource utilization of the Kubernetes cluster. It's best to use as few resources as possible. Startup order: The main application may become dependent on the sidecar for certain functionalities, potentially leading to issues if there are discrepancies or version mismatches between the main application and the sidecar. Arranging containers in a specific order without additional configuration can be somewhat challenging. However, this shouldn't pose a problem in test environments due to the quick startup of the PostgreSQL container. In most scenarios, the PostgreSQL container will initiate before any of your business applications. Even if the application attempts to run before PostgreSQL is ready, it will encounter a failure and be automatically restarted by the default Kubernetes mechanism until the database becomes available. Learning curve: Adopting the sidecar pattern may require a learning curve for development and operations teams, particularly if they are new to the concept of containerized sidecar architectures. Once the setup is complete, new team members should encounter no issues with this approach. Conclusion In conclusion, the choice between using KubDB and the PostgreSQL sidecar approach for integrating web applications and databases in a test environment ultimately depends on your specific requirements and preferences. KubDB offers a comprehensive solution with Kubernetes-native features, streamlining the management of databases alongside web services. On the other hand, the PostgreSQL sidecar approach provides flexibility and fine-grained control over how databases and web applications interact. Whether you opt for the simplicity and seamless integration provided by KubDB or the customization potential inherent in the sidecar pattern, both approaches lay a solid foundation for test environments. The key lies in understanding the unique demands of your project and selecting the method that aligns best with your development workflow, scalability needs, and overall application architecture. Whichever path you choose, the insights gained from exploring these approaches in a test setting can pave the way for a robust and efficient integration strategy in your production environment.
Snowflake's evolution over the last few years is simply amazing. It is currently a data platform with a great ecosystem both in terms of partners and a wide variety of components like snowgrid, snowpark, or streamlit, but in this article, we are not going to focus on its role as a modern cloud-based data warehouse. It revolutionizes the traditional concept of data warehousing; it offers a more agile and scalable platform that separates storage, computing, and services, allowing each component to scale independently. This means you can store unlimited data, ramp up or down your computing resources based on your querying needs, and only pay for what you use. Currently, we can say that Snowflake is mainly an Online Analytical Processing (OLAP) type solution, but as we see further on, it is evolving to provide transactional and analytical capabilities in a single platform. Below is a high-level architecture diagram showing the layers that are part of Snowflake. Cloud Services Layer: It coordinates and handles tasks that are not specific to querying or storing data. It includes several tasks, such as authenticating user sessions, role-based access control, or ensuring transactional consistency. Compute Layer: This layer is where the actual data processing happens. It comprises one or multiple virtual warehouses, which are essentially clusters of compute resources. Each virtual warehouse can scale up or down independently and can be started or stopped to optimize costs. Storage Layer: This layer is responsible for the storage of structured and semi-structured data. It is stored in cloud storage in a columnar format. Optimized for Analytical Queries Snowflake is designed for big data analytics. It can handle complex queries on large datasets efficiently due to its columnar storage and massively parallel processing (MPP) architecture. Analytical queries typically work with a subset of columns and operations to aggregate, transform, and analyze vast volumes of data to provide insights, trends, or patterns. These are some of the common operations used in analytical queries: Aggregations: Functions like SUM(), AVG(), COUNT(), and MAX() are usually used to summarize data. Range scans: Scan wide ranges of data (e.g. WHERE sale_date BETWEEN '2022-01-01' AND '2022-12-31') Group by: Grouping data using GROUP BY clauses in combination with aggregation functions to provide summaries by some attribute. Ordering and windows function: Use ordering (ORDER BY) and window functions (e.g., ROW_NUMBER(), LAG(), LEAD()) to calculate running totals, ranks, and other advanced analytics. Let's see it in an example to help us understand it: Columnar Storage, the Foundation of Performance Columnar storage, as opposed to row-based storage, manages data using columns (product_id, name, sale_date, etc.) as logical units that are used to store the information in memory. Each logical unit always stores the same data type, which means that the adjacent data in memory all have the same type of data. This strategy provides a number of performance benefits: Data access efficiency: Aggregation queries, like those calculating sums or averages, often require data from only a few columns rather than the entire row. In columnar storage, data is stored column by column. This means that when executing an aggregation on a specific column, the database system can read only the data for that column, skipping over all other unrelated columns. This selective reading can significantly reduce I/O operations and speed up query performance. Compression: Data within a column tends to be more homogeneous (i.e., of the same type and often with similar values) compared to data within a row. This homogeneity makes column data more amenable to compression techniques. For example, if a column storing a month of transaction dates mostly has the same few dates, you can represent those repeated dates once with a count instead of storing them repeatedly. Effective compression reduces storage costs and can also boost performance by reducing the amount of data read from storage. Better CPU cache utilization: Since columnar databases read contiguous memory blocks from a single column, they can better utilize CPU caches. The data loaded into the cache is more likely to be used in the subsequent operations, leading to fewer cache misses. On the other hand, in row-based systems, if only a few columns from a wide row are needed, much of the cached data might go unused. Eliminating irrelevant data quickly: Many columnar databases use metadata about blocks of columnar data, like min and max values. This metadata can quickly determine if a block contains relevant data or can be skipped entirely. For instance, if a query is filtering for pricing over 200, and the maximum value in a block is 110, the entire block can be ignored. In the following diagram, we explain in a simple way how columnar storage could work to help you understand why it is efficient in analytical queries. But it does not mean that Snowflake implements this logic. In this example, the values of each column can be stored in the same order: the first value of the product_id corresponds to the first value in the sales_Data and to the first in the amount; the second to the second to the second, and so on. Therefore when you filter by date, you can quickly get the offsets assigned for the start and end of the timestamp range and also give the offset of the corresponding values in the amount and perform the necessary calculations. Unistore Unifying Analytical and Transactional Data Snowflake is evolving its platform by applying a modern approach to provide transactional and analytical data operations together in a single platform. The new feature is called Unistore and enables running transactional by offering fast single-row operations. Therefore, Snowflake joins a small group of cloud-based databases that offer this type of capability, such as SingleStore or MySQL Heatwave. This feature is still in private preview and has limited access, so we will have to verify the latency times. It should be considered that there are other features of transactional and relational databases, such as referential integrity that are not supported. Row Storage, Transactional Performance Typically, databases are oriented to work at the row level, and queries or operations use row-based storage or row-oriented storage. It is a method in which data is stored by rows. It is especially effective for transactional online transaction processing (OLTP) and workloads that frequently involve single-row queries or operations. Some of the benefits of using this type of storage are listed below: Fewer columns in OLTP queries: Transactional queries, like those from web applications or operational systems, often involve a limited number of columns but require complete rows. In such scenarios, reading a full row from row-based storage is more efficient than assembling a row from multiple columns in columnar storage. Optimized for transactional workloads: OLTP systems often have a high number of small, frequent read and write operations. When updating or inserting a new row in row-based storage, the database writes the whole row at once. This contrasts with columnar systems where an insert or update might involve writing data across various column files. Locking and concurrency: Row-based databases are often optimized for row-level locking. This means that when a row is being updated, the database can lock just that specific row, allowing other operations to proceed concurrently on other rows. This level of granularity in locking is beneficial for high-concurrency transactional systems. Snowflake Platform Layers Cloud Services The Cloud Services layer plays a crucial role in managing and optimizing the overall functionality of the data warehouse and acts as the "brain" that orchestrates processes and resources to deliver a seamless, secure, and scalable data analysis and management experience. It's responsible for handling a wide range of tasks, from authentication and infrastructure management to metadata maintenance and query optimization. It is probably the most unknown layer, which means it is a user-friendly layer that goes unnoticed precisely because of its efficiency and simplicity. This layer offers several key features: Query processing: It receives SQL queries, parses them, and optimizes them for efficient execution, distributing the workload across its compute resources. Metadata management: It maintains metadata for the data stored that includes information about table structures, data types, and compression methods, as well as query history and performance metrics. Access control and security management: It handles user authentication, authorization, and role-based access control. It ensures that users can only access the data and perform the actions their roles permit. Transaction management: Handle the main features of transaction processing, including concurrency control and ensuring the ACID (Atomicity, Consistency, Isolation, Durability) properties of transactions. That, in conjunction with storage layer features (durability, consistency, or data versioning), is crucial for maintaining data integrity and consistency. Infrastructure management: It dynamically allocates and scales computational resources, the Virtual Warehouses, automatically scaling them up or down based on the workload. Data sharing and collaboration: It facilitates secure data sharing across different Snowflake accounts, sharing subsets of data without copying or moving the data, enabling real-time and seamless collaboration. Performance and usage monitoring: It provides tools and dashboards for monitoring the performance and usage of the Snowflake environment. Although, in my opinion, this is one of Snowflake's capabilities that can be improved. Integrations and API support: It provides support for various integrations and APIs, allowing users, applications, and tools to interact with the Snowflake platform. For example, it allows the management of all resources (compute, user management, monitoring, or security) following an as-code approach. Compute Layer This layer is composed of virtual warehouses that are essentially compute clusters and are responsible for executing SQL queries on the data stored in Snowflake. It supports creating multiple virtual warehouses to handle and distribute your workloads. This enables us to create dedicated and sized resources for each scenario or actor. For example, if you have different squads accessing data concurrently on top of the applications and BI tools, we can create and assign their own warehouse, ensuring that heavy querying by some of them doesn't affect another's performance. Isolation: Each cluster is a component isolated from the rest and, therefore, is not affected by the load state of other clusters. Independent scaling: It supports scale-up and scale-out independently for each cluster. If you need more performance for larger queries or more users, you can increase the size of your warehouse or add more nodes using multi-clustering capabilities. Independent elasticity: It supports automatic scale-out, although vertical scaling is not automated and, therefore requires us to perform manual or automatic actions. On-the-fly resizing: Scaling a virtual warehouse in Snowflake can be done on the fly without any downtime. This allows for elasticity, where you can adapt to varying workloads as needed. Multi-cluster warehouses: For even higher levels of concurrency, it enables scale-out automatically from one cluster to multiple compute clusters to accommodate many simultaneous users or queries. Storage Layer It is responsible for storing and managing data efficiently and contributes to having an effective and scalable platform. It offers several key features: Types of data: Snowflake supports structured and semi-structured data, including JSON, Avro, XML, Parquet formats, or Iceberg tables. Elastic and Scalable Storage: The storage layer automatically scales to accommodate data growth without manual intervention, so we do not need to worry about storage limits or provisioning additional storage space. Optimized data storage format: it stores data in an optimized columnar format or in row format in the case of Unistore tables, which can be indexed like traditional OLTP engines. Optimizing storage for each data use case. Data clustering and micro-partitions: Snowflake automatically organizes data into micro-partitions, which are internally optimized and compressed to improve query performance in terms of time and compute resources. Time travel and fail-safe features: It provides the capacity to access historical data up to a certain point in the past at table level. This allows us to revert to previous data states within a specified time window, providing data protection and ensuring data integrity or performing historical data analysis. The fail-safe feature offers additional protection by maintaining the data for a set period for disaster recovery. Data sharing: Snowflake enables secure and easy sharing of data between different Snowflake accounts. This feature allows organizations to share live, ready-to-query data with partners and customers without moving or copying data, ensuring data governance and security. Security and compliance: It provides several security features, including encryption of data at rest and in transit, role-based access control, and compliance with various industry standards and regulations. Cost-effective storage: We pay only for the storage they use, with Snowflake compressing and storing data in a cost-efficient manner. Conclusions In this series of articles, we will explore the various ways in which Snowflake can be used to address a wide range of data challenges. We will start with the basics of SQL and how to use it to query data in Snowflake. We will then move on to more advanced topics such as data modeling, query optimization, and machine learning. Before embarking on any project, it is crucial to understand its underlying architecture, capabilities, and limitations. Failure to understand the nuances of products and platforms can lead to inefficiencies, performance bottlenecks, excessive costs, and potential security vulnerabilities. This is precisely the purpose of this first article, to understand Snowflake's architecture and fundamental features.
In today's data-driven landscape, organizations are increasingly turning to robust solutions like AWS Data Lake to centralize vast amounts of structured and unstructured data. AWS Data Lake, a scalable and secure repository, allows businesses to store data in its native format, facilitating diverse analytics and machine learning tasks. One of the popular tools to query this vast reservoir of information is Amazon Athena, a serverless, interactive query service that makes it easy to analyze data directly in Amazon S3 using standard SQL. However, as the volume of data grows exponentially, performance challenges can emerge. Large datasets, complex queries, and suboptimal table structures can lead to increased query times and costs, potentially undermining the very benefits that these solutions promise. This article delves specifically into the details of how to harness the power of partition projections to address these performance challenges. Before diving into the advanced concept of partition projections in Athena, it's essential to grasp the foundational idea of partitions, especially in the context of a data lake. What Are Partitions in AWS Data Lake? In the realm of data storage and retrieval, a partition refers to a division of a table's data based on the values of one or more columns. Think of it as organizing a vast bookshelf (your data) into different sections (partitions) based on genres (column values). By doing so, when you're looking for a specific type of book (data), you only need to search in the relevant section (partition) rather than the entire bookshelf. In a data lake, partitions are typically directories that contain data files. Each directory corresponds to a specific value or range of values from the partitioning column(s). Why Are Partitions Important? Efficiency: Without partitions, querying vast datasets would involve scanning every single file, which is both time-consuming and costly. With partitions, only the relevant directories are scanned, significantly reducing the amount of data processed. Cost Savings: In cloud environments like AWS, where you pay for the amount of data scanned, partitions can lead to substantial cost reductions. Scalability: As data grows, so does the importance of partitions. They ensure that even as your data lake swells with more data, retrieval times remain manageable. Challenges With Partitions While partitions offer numerous benefits, they aren't without challenges: Maintenance: As new data comes in, new partitions might need to be created, and existing ones might need updates. Optimal Partitioning: Too few partitions can mean you're still scanning a lot of unnecessary data. Conversely, too many partitions can lead to a large number of small files, which can also degrade performance. With this foundational understanding of partitions in a data lake, we can now delve deeper into the concept of partition projections in Athena and how they aim to address some of these challenges. What Are Partition Projections? Partition pruning is a technique where only the relevant metadata, specific to a query, is selected, eliminating unnecessary data. This method often makes queries run faster. Athena employs this strategy for all tables that have partitioned columns. In a typical scenario, when Athena processes queries, it first communicates with the AWS Glue Data Catalog by making a GetPartitions request, after which it performs partition pruning. However, if a table has an extensive set of partitions, this call can slow things down. To avoid this expensive operation on a highly partitioned table, AWS has introduced the technique of partition projections. With partition projection, Athena doesn't need to make the GetPartitions call. Instead, the configuration provided in partition projection equips Athena with all it needs to create the partitions on its own. Benefits of Partition Projections Improved Query Performance: By reducing the amount of data scanned, queries run faster and more efficiently. Reduced Costs: With Athena, you pay for the data you scan. By scanning less data, costs are minimized. Simplified Data Management: Virtual partitions eliminate the need for continuous partition maintenance tasks, such as adding new partitions when new data arrives. Setting Up Partition Projections To utilize partition projections: 1. Define Projection Types: Athena supports several projection types, including `integer,` `enum,` `date,` and `injected.` Each type serves a specific use case, like generating a range of integers or dates. 2. Specify Projection Configuration: This involves defining the rules and patterns for your projections. For instance, for a date projection, you'd specify the start date, end date, and the date format. 3. Modify Table Properties: Once projections are defined, modify your table properties in Athena to use these projections. An Example Use-Case Let us take an example where our data is stored in the data lake and is partitioned by customer_id and dt. The data is stored in parquet format, which is a columnar data format. s3://my-bucket/data/customer_id/yyyy-MM-dd/*.parquet In our example, let us have data for one year, i.e., 365 days and 100 customers. This would result in 365*100=36500 partitions on the data-lake. Let us benchmark the queries on this table with and without partition projections enabled. Let us get the count of all the records for the entire year for five customers. Query SQL SELECT count(*) FROM "analytic_test"."customer_events" where dt >= '2022-01-01' and customer_id IN ('Customer_001', 'Customer_002', 'Customer_003', 'Customer_004', 'Customer_005') Without Partition Projection Without partition projections enabled, the total query runtime is 7.3 seconds. Out of that, it spends 78% in planning and 20% executing the query. Query Results Planning: 78% = 5.6 secondsExecution 20% = 1.46 With Partition Projections Now, let us enable partition projection for this table. Take a look at all the table properties that are suffixed with "partition.*". In this example, since we had two partitions, dt and cutsomer_id. We will use date type projection, and for customer_id, we will use enum type projection. For enum types, you can build an automation job to update the table property whenever there are newer records for it. SQL CREATE EXTERNAL TABLE `customer_events`( `event_id` bigint COMMENT '', `event_text` string COMMENT '') PARTITIONED BY ( `customer_id` string COMMENT '', `dt` string COMMENT '') ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://my-bucket/data/events.customer_events' TBLPROPERTIES ( 'has_encrypted_data'='false', 'parquet.compression'='SNAPPY', 'transient_lastDdlTime'='1698737433', 'projection.enabled'='true', 'projection.dt.type'='date', 'projection.dt.range'='NOW-1YEARS,NOW', 'projection.dt.format'='yyyy-MM-dd', 'projection.dt.interval'='1', 'projection.dt.interval.unit'='DAYS', 'projection.customer_id.type'='enum', 'projection.customer_id.values'='Customer_001,Customer_002,Customer_003,Customer_004,Customer_005,Customer_006,Customer_007,Customer_008,Customer_009,Customer_010,Customer_011,Customer_012,Customer_013,Customer_014,Customer_015,Customer_016,Customer_017,Customer_018,Customer_019,Customer_020,Customer_021,Customer_022,Customer_023,Customer_024,Customer_025,Customer_026,Customer_027,Customer_028,Customer_029,Customer_030,Customer_031,Customer_032,Customer_033,Customer_034,Customer_035,Customer_036,Customer_037,Customer_038,Customer_039,Customer_040,Customer_041,Customer_042,Customer_043,Customer_044,Customer_045,Customer_046,Customer_047,Customer_048,Customer_049,Customer_050,Customer_051,Customer_052,Customer_053,Customer_054,Customer_055,Customer_056,Customer_057,Customer_058,Customer_059,Customer_060,Customer_061,Customer_062,Customer_063,Customer_064,Customer_065,Customer_066,Customer_067,Customer_068,Customer_069,Customer_070,Customer_071,Customer_072,Customer_073,Customer_074,Customer_075,Customer_076,Customer_077,Customer_078,Customer_079,Customer_080,Customer_081,Customer_082,Customer_083,Customer_084,Customer_085,Customer_086,Customer_087,Customer_088,Customer_089,Customer_090,Customer_091,Customer_092,Customer_093,Customer_094,Customer_095,Customer_096,Customer_097,Customer_098,Customer_099,Customer_100') Query results Planning: 1.69 secondsExecution: 0.6 seconds Results We can see a roughly 70% improvement in the query performance. This is because Athena avoids a remote call to AWS glue to fetch the partitions, as with this feature, it is able to project the values for these partitions. Limitations and Considerations While powerful, partition projections do not solve all the problems. Complex Setups: Setting up projections requires a deep understanding of your data and the patterns it follows. Not Always the Best Fit: For datasets that don't follow predictable patterns or have irregular updates, traditional partitioning might be more suitable. Conclusion AWS's introduction of Partition Projections in Athena is a testament to their commitment to improving user experience and efficiency. By leveraging this feature, organizations can achieve faster query performance with minimal configuration changes. As with all tools, understanding its strengths and limitations is key to harnessing its full potential.
IoT developers know how important it is to find not just what works but what works best for a particular IoT use case. After all, a smart proximity sensor in an IoT security system is very different from a smartwatch you might use to monitor your health, take calls, send messages, and more. The same goes for development boards, which are a central part of any IoT device. Each one comes with its own set of specifications and pros and cons. Learn how to find the best boards for your specific project and what the differences are between them in this guide. What Are IoT Development Boards? Any IoT product will likely have gone through various different iterations before it finally hits the shelves. Developers will make changes like adding or removing features, altering the user interface based on consumer feedback, etc. Unlike a market-ready IoT device, in which the circuit board and electronics inside are hidden by the outer plastic, a development board has everything you need to configure an IoT device exposed for easy access so developers can test out different settings and make changes on the fly. Development boards are just printed circuit boards with either a microcontroller or a microprocessor mounted on them and with various metal pins that an experienced developer can use to develop, prototype, and iterate different versions of various smart products. They’re also used by hobbyists to help them become more familiar with development concepts and hardware. IoT developers can also use the boards to develop a Minimum Viable Product (MVP). The purpose of an MVP is to attract early adopters, show the potential of a final product design, and gain both funding and interest. The MVP stage involves making changes based on initial user feedback, so it’s especially important to have full access to the hardware. Finally, a development board provides the perfect staging ground for trying out a new software product, app, or IoT platform prior to rolling it out on an entire line of smart products. What Are the Different IoT Development Board Categories? The major categories within IoT development boards are Field Programmable Gate Arrays (FPGA), Application-Specific Integrated Circuits (ASIC), and Single Board Computers (SBC). Let’s look at each in turn. You can think of an FPGA as a book that hasn’t been written yet. It doesn’t have a specific purpose, and it’s up to you as the writer to decide what its purpose will be. FPGAs lead all of the configuration up to you. The “field-programmable” part also means that you can “re-write” the purpose of the device at any time. That makes it best suited for the prototyping stages in which you might need to make changes on the fly or develop new functionalities for the device. FPGAs are best used by experienced developers since they don’t come with the hardware pre-configured. If you want to use an FPGA development board properly, you’ll have to learn a Hardware Development Language (HDL). Learning an HDL has been compared to learning complex digital programming languages like C. HDLs help the developer know how the device will respond to various configurations. An ASIC development board, unlike an FPGA, comes pre-programmed with a specific purpose or purpose in mind. It leaves a lot less work to the developer. However, an ASIC board isn’t always best for prototyping because you can’t make major changes to the configuration. Finally, an SBC development board is a full-fledged computer that comes on a single board. The Raspberry Pi might be the best-known example. An SBC has a lot of processing power. While it’s nowhere near as customizable as an FPGA, it will generally have enough different input and output (I/O) options to apply to many different IoT use cases. However, SBCs also require a lot more energy than other development board options. And although their physical size is relatively small, they’re still far too large for, say, a simple smart temperature sensor. Important Features of Development Boards FPGA, ASIC, and SBC development boards have everything you need for an IoT device system in the form of a processor, an operating system, input and output options, and more. But they will differ in what type of processor they use. An FPGA might use either a microprocessor (MPU) or a microcontroller (MCU), whereas an ASIC will generally use one or more MCUs. An SBC, on the other hand, generally relies on an MPU. The reason for these differences is that a microprocessor is higher-powered and suitable for performing multiple tasks rather than just one. It has a full operating system, generally Linux-based, and tends to be higher-cost than an MCU as a result. By contrast, a microcontroller is a low-powered device that’s better suited for a single task that an ASIC might perform. MCUs rely on something called task switching to provide a real-time response to input. Task switching means the MCU will decide which task to perform first based on priority rather than trying to perform multiple tasks at the same time. Since the MCU can focus all of its processing power on one task, that task gets finished very quickly. So, in manufacturing, where a machine might need to turn off within a millisecond of reaching a dangerous temperature in order to avoid a fire, an MCU often provides the best performance. For these reasons, you always need to know whether the development board you plan to buy has an MCU or an MPU. Here are some other features to look out for as well: Open-Source Hardware (OSHW) OSHW is just what it sounds like. Just as there are open-source software designs that allow you to see and freely change the code at will, there are open-source hardware development boards that let you freely use and change the configuration as needed. Arduino development boards are one example. Available Ports When you start to develop a new IoT device, determine what kinds of input and output ports you will need for your specific application. For example, do you want to allow USB connectivity? Do you need an HDMI port for video streaming? Operating System An SBC will generally use Linux, but what about an ASIC development board? ASICs will often rely on a real-time operating system (RTOS). An RTOS is a simplified operating system that performs a single task at a time and switches between tasks. There are also ASICs that have a tick scheduler rather than an OS. A tick scheduler just repeats a single task at certain preset intervals. Connectivity Options These days, one of the most popular features of any IoT device is mobile connectivity. In other words, most consumers want the ability to view the status of their IoT devices remotely and control those devices right from a smartphone. For those purposes, you should examine whether the development board has Bluetooth, wireless internet, 4G, or 5G connectivity capabilities. Random Access Memory If you work with computers much, you’re already familiar with the concept of RAM. While storage memory refers to how much data you can keep on your device, RAM refers to temporary memory that’s used to perform tasks at any given time. So, more memory means the device can perform more tasks or just one task with more processing power. Your cost will go up as RAM increases, so for simple tasks, less RAM is often actually better. Processor Choosing the best processor is far too complicated to go into detail here, but it’s enough to know that three of the top processor options are ARM, Intel x86, and AMD. Intel x86 processors are used in most personal computers for their speed and processing power, but they use more energy as a result. AMDs have even more processing power on average, whereas ARM processors have simpler instruction sets and require less power. ARM processors also tend to be physically smaller, meaning they fit better in size-constrained IoT devices. So, in low-power environments, ARM is often the best option. Examples of IoT Development Boards Some of the top IoT development boards are from the Espressif ESP32 family and the Renesas RA MCU series. Other top providers include NXP, Texas Instruments, STMicro, and Microchip, among others. ESP32 development boards can come in various sizes and levels of processing power. In general, they are low-cost, MCU-based, and low-power, which is ideal for energy-constrained applications. They support Wi-Fi and Bluetooth and have various I/Os to allow you to configure or customize the development boards as needed. Many (but not all) boards in the ESP32 family also have USB connectivity. Renesas is another top provider of IoT development boards, especially of MCU boards. The RA MCU series are general-purpose, so they can be used for many different applications and configured as necessary. RA MCUs tend to be easy to use and debug and have quick start options to let developers get started immediately. Different options within the RA series will include different input and connectivity features, so you’ll have to look at each individually to determine the best board for your use case. There’s also the STM32 family from STMicroelectronics. These are also MCU boards and come with a huge range of features and options, from high-performance and higher-energy versions to ultra-low power options and long-range wireless MCUs. They can come in both single and dual-core versions and are commonly used in consumer IoT devices. STM32 Nucleo boards are commonly used for prototyping and are very developer-friendly and compatible with a variety of development environments and platforms. Arduino is also a popular option for boards with the lowest power consumption and simplest designs. For example, the Arduino Uno is ideal for less experienced developers, and the Arduino Cloud platform makes it easy to create a connected device in minutes. There’s also the Arduino Nano for the smallest IoT devices, which is a cheap, low-power option that offers Bluetooth connectivity. Additionally, Arduino offers its own development environment. Lastly, there are the C2000 and MSP430 MCU boards from Texas Instruments. Texas Instruments provides cost-efficient options for development boards with a huge range of memory and peripheral choices. C2000 boards are specifically designed for real-time capabilities, so they are highly suitable for industrial IoT applications and are made for performance and speed. There are over 2000 different MSP430 devices, and these are designed for quick time-to-market, easy development, flexibility, and cost efficiency. Texas Instruments also offers starter kits, comprehensive documentation, and software resources. Wrapping Up A development board is normally the starting ground for any new IoT project. It allows you to make changes, adjust to feedback, and produce the best possible device for your particular use case. Now you can go in fully informed and ready to find and test out the ideal development board for your project.