Also known as the build stage of the SDLC, coding focuses on the writing and programming of a system. The Zones in this category take a hands-on approach to equip developers with the knowledge about frameworks, tools, and languages that they can tailor to their own build needs.
A framework is a collection of code that is leveraged in the development process by providing ready-made components. Through the use of frameworks, architectural patterns and structures are created, which help speed up the development process. This Zone contains helpful resources for developers to learn about and further explore popular frameworks such as the Spring framework, Drupal, Angular, Eclipse, and more.
Java is an object-oriented programming language that allows engineers to produce software for multiple platforms. Our resources in this Zone are designed to help engineers with Java program development, Java SDKs, compilers, interpreters, documentation generators, and other tools used to produce a complete application.
JavaScript (JS) is an object-oriented programming language that allows engineers to produce and implement complex features within web browsers. JavaScript is popular because of its versatility and is preferred as the primary choice unless a specific function is needed. In this Zone, we provide resources that cover popular JS frameworks, server applications, supported data types, and other useful topics for a front-end engineer.
Programming languages allow us to communicate with computers, and they operate like sets of instructions. There are numerous types of languages, including procedural, functional, object-oriented, and more. Whether you’re looking to learn a new language or trying to find some tips or tricks, the resources in the Languages Zone will give you all the information you need and more.
Development and programming tools are used to build frameworks, and they can be used for creating, debugging, and maintaining programs — and much more. The resources in this Zone cover topics such as compilers, database management systems, code editors, and other software tools and can help ensure engineers are writing clean code.
Build Your Own GitHub-Like Tool With React in One Hour
Creating Scrolling Text With HTML, CSS, and JavaScript
This table in Python contains a list of code smells and the design patterns that address them. Python class CodeSmells: Duplicated_Code = [ form_template_method, introduce_polymorphic_creation_with_factory_method, chain_constructors, replace_one__many_distinctions_with_composite, extract_composite, unify_interfaces_with_adapter, introduce_null_object, ] Long_Method = [ compose_method, move_accumulation_to_collecting_parameter, replace_conditional_dispatcher_with_command, move_accumulation_to_visitor, replace_conditional_logic_with_strategy, ] Conditional_Complexity = [ # Complicated conditonal logic replace_conditional_logic_with_strategy, move_emblishment_to_decorator, replace_state_altering_conditionals_with_state, introduce_null_object, ] Primitive_Obssession = [ replace_type_code_with_class, replace_state_altering_conditionals_with_state, replace_conditional_logic_with_strategy, replace_implict_tree_with_composite, replace_implicit_language_with_interpreter, move_emblishment_to_decorator, encapsulate_composite_with_builder, ] # Lack of "information hiding" [Parnas] Indecent_Exposure = [encapsulate_classes_with_factory] # The logic/responsibility is sprawled in multiple places # (classes, methods) Solution_Sprawl = [move_creation_knowledge_to_factory] # [Fowler and Beck] Interfaces of classes different, # but classes are similar Alternative_Classes_with_Different_Interfaces = unify_interfaces_with_adapter # [Fowler and Beck] A class the doesn't do enough to pay itself Lazy_Class = [inline_singleton] Large_Class = [ replace_conditional_dispatcher_with_command, replace_state_altering_conditionals_with_state, replace_implict_tree_with_composite, ] Switch_Statements = [ # Complicated switches replace_conditional_dispatcher_with_command, move_accumulation_to_visitor, ] # Code that do the same with different types or quantity of data # (similar to duplication) Combination_Explostion = [replace_implicit_language_with_interpreter] # The same problem being solved in many ways in the system # (similar to duplication) Oddball_Solutions = [unify_interfaces_with_adapter] The Journey After nearly a year of effort, I’ve finally completed my self-imposed goal of writing all the refactoring examples from the book Refactoring to Patterns by Joshua Kerievsky in Python. This book broadened my understanding of how to apply design patterns in production code. Each example includes a brief explanation of the original code and its context, followed by the refactored code and the benefits gained through the refactoring. For instance, the refactoring to "Compose Method" transforms difficult-to-read code into a simple, streamlined implementation. Let's take this example and examine the original code. Python # Original code # It is not easy to understand the code def add(element): readonly = False size = 0 elements = [] if not readonly: new_size = size + 1 if new_size > len(elements): new_elements = [] for i in range(size): new_elements[i] = elements[i] elements = new_elements size += 1 elements[size] = element It is possible to see that the code is not easy to understand. It has many nested conditions and loops. Now, let's go to the refactored code. Python # Code Refactored # The new code has meaningfull names for blocks of code and is not nested. # The Compose Method is a refactoring to simplificate the code def at_capacity(new_size, elements): new_size > len(elements) def grow(size): new_elements = [] for i in range(size): new_elements[i] = elements[i] elements = new_elements def add_elements(elements, element, size): size += 1 elements[size] = element def add_refac(element): readonly = False if readonly: return if at_capacity: grow() add_elements(element) The idea of the refactoring is to reduce the complication with meaningful methods and remove the nested branches. Notice it was necessary to extract blocks of code to methods. While working through the book and writing the examples, I had to interpret UML diagrams and understand the mechanics in detail. This required intense focus and mental effort. Many times, I had to rebuild the examples from scratch because converting code from Java to Python was not straightforward. Native Python doesn’t support cyclic imports, constructor overloads, or interfaces well, so some adaptations were necessary. I added comments on these areas to help with future consultations on the code. Through this process, I realized my previous understanding of design patterns was mostly theoretical and limited to trivial scenarios. For example, while I understood that "Polymorphism" addresses development problems, the book showed its application in test automation by abstracting the setup phase and reusing the remaining test implementation. Here are both versions of the code. The difference between the original and new code is the setup of the test. Python # Original code # Similar methods differs from the object instantiation. # All the rest is the same class TestCase: pass class DOMBuilder: def __init__(self, orders) -> None: pass def calc(self): return 42 class XMLBuilder: def __init__(self, orders) -> None: pass def calc(self): return 42 class DOMTest(TestCase): def run_dom_test(self): expected = 42 builder = DOMBuilder("orders") # different object created assert builder.calc() == expected class XMLTest(TestCase): def run_xml_test(self): expected = 42 builder = XMLBuilder("orders") # different object created assert builder.calc() == expected # Code refactored # The instantiation of the DOMBuilder or XMLBuilder is the only difference # in both tests. # It was created an OutputBuilder like an interface for both classes # (it is not necessary given that Python uses duck type). # In TestCase a new method called "create_builder" was introduced to be # implemented by the children classes. # This is the step executed in runtime for each type of test. This is the # polymorphism. When both tests (DOMTest and XMLTest) are executed, # the instance returned from the "create_builder" depends on the # implementation. Is can be DOMBuilder or XMLBuilder. class OutputBuilder: def calc(self): raise NotImplementedError() class DOMBuilderRefac(OutputBuilder): def calc(self): return 42 class XMLBuilderRefac(OutputBuilder): def calc(self): return 42 class TestCaseRefac: def create_builder(self): raise NotImplementedError() def run_test(self): expected = 42 builder = self.create_builder() # different object created assert builder.calc() == expected class DOMTestRefac(TestCaseRefac): def create_builder(self) -> OutputBuilder: return DOMBuilderRefac() class XMLTestRefac(TestCaseRefac): def create_builder(self): return XMLBuilderRefac() def run(): dom_tc = DOMTestRefac() dom_tc.run_test() xml_tc = XMLTestRefac() xml_tc.run_test() The "Visitor" pattern was the most difficult for me to understand. I read about the pattern in the Design Patterns book before attempting the refactoring. It was only after seeing the original (unrefactored) code being transformed into the new version that I realized the pattern isn’t as complex as it initially seems. Essentially, the pattern decouples classes from their methods. Again, both codes are for comparison. The implementation of the pattern is "by the book." Python # Original code # The TextExtractor has lots of conditons to handle Nodes, like StringNode # The idea ofthe rectoring is distribute the logic into Visitor classes # Interface class Node: pass class LinkTag(Node): pass class Tag(Node): pass class StringNode(Node): pass class TextExtractor: def extract_text(self, nodes: list[Node]): result = [] for node in nodes: if isinstance(node, StringNode): result.append("string") elif isinstance(node, LinkTag): result.append("linktag") elif isinstance(node, Tag): result.append("tag") else: result.append("other") return result # Code refactored # Interface (the visitor) class NodeVisitorRefac: def visit_link_tag(self, node): return "linktag" def visit_tag(self, node): return "tag" def visit_string_node(self, node: object): return "string" class NodeRefac: def accept(self, node: NodeVisitorRefac): pass class LinkTagRefac(NodeRefac): def accept(self, node: NodeVisitorRefac): return node.visit_link_tag(self) class TagRefac(NodeRefac): def accept(self, node: NodeVisitorRefac): return node.visit_tag(self) class StringNodeRefac(NodeRefac): def accept(self, node: NodeVisitorRefac): return node.visit_string_node(self) # The concret visitor class TextExtractorVisitorRefac(NodeVisitorRefac): def extract_text(self, nodes: list[NodeRefac]): result = [] for node in nodes: result.append(node.accept(self)) return result def run_refac(): # The original object calling its method result1 = TextExtractor().extract_text([StringNode()]) # The new object accepting visitors result2 = TextExtractorVisitorRefac().extract_text([StringNodeRefac()]) return result1, result2 Conclusion I highly recommend the book to everyone. The first time I read it, I found it boring and difficult to grasp the concepts just by following static code examples. However, when you actively write the code, the ideas gradually come to life. Errors will occur, and addressing them requires understanding the underlying concepts. This process transforms theory into practice and solidifies your knowledge.
End-to-end tests are essential for ensuring the reliability of your application, but they can also be a source of frustration. Even small changes to the user interface can cause tests to fail, leading developers and QA teams to spend hours troubleshooting. In this blog post, I’ll show you how to utilize AI tools like ChatGPT or Copilot to automatically fix Playwright tests. You’ll learn how to create an AI prompt for any test that fails and attach it to your HTML report. This way, you can easily copy and paste the prompt into AI tools for quick suggestions on fixing the test. Join me to streamline your testing process and improve application reliability! Let’s dive in! Plan The solution comes down to three simple steps: Identify when a Playwright test fails.Create an AI prompt with all the necessary context: The error messageA snippet of the test codeAn ARIA snapshot of the pageIntegrate the prompt into the Playwright HTML report. By following these steps, you can enhance your end-to-end testing process and make fixing Playwright tests a breeze. Step-by-Step Guide Step 1: Detecting a Failed Test To detect a failed test in Playwright, you can create a custom fixture that checks the test result during the teardown phase, after the test has completed. If there’s an error in testInfo.error and the test won't be retried, the fixture will generate a helpful prompt. Check out the code snippet below: JavaScript import { test as base } from '@playwright/test'; import { attachAIFix } from '../../ai/fix-with-ai' export const test = base.extend({ fixWithAI: [async ({ page }, use, testInfo) => { await use() await attachAIFix(page, testInfo) }, { scope: 'test', auto: true }] }); Step 2: Building the Prompt Prompt Template I'll start with a simple proof-of-concept prompt (you can refine it later): You are an expert in Playwright testing. Your task is to fix the error in the Playwright test titled "{title}". - First, provide a highlighted diff of the corrected code snippet. - Base your fix solely on the ARIA snapshot of the page. - Do not introduce any new code. - Avoid adding comments within the code. - Ensure that the test logic remains unchanged. - Use only role-based locators such as getByRole, getByLabel, etc. - For any 'heading' roles, try to adjust the heading level first. - At the end, include concise notes summarizing the changes made. - If the test appears to be correct and the issue is a bug on the page, please note that as well. Input: {error} Code snippet of the failing test: {snippet} ARIA snapshot of the page: {ariaSnapshot} Let’s fill the prompt with the necessary data. Error Message Playwright stores the error message in testInfo.error.message. However, it includes special ASCII control codes for coloring output in the terminal (such as [2m or [22m): TimeoutError: locator.click: Timeout 1000ms exceeded. Call log: [2m - waiting for getByRole('button', { name: 'Get started' })[22m After investigating Playwright’s source code, I found a stripAnsiEscapes function that removes these special symbols: JavaScript const clearedErrorMessage = stripAnsiEscapes(testInfo.error.message); Cleared error message: TimeoutError: locator.click: Timeout 1000ms exceeded. Call log: - waiting for getByRole('button', { name: 'Get started' }) This cleaned-up message can be inserted into the prompt template. Code Snippet The test code snippet is crucial for AI to generate the necessary code changes. Playwright often includes these snippets in its reports, for example: 4 | test('get started link', async ({ page }) => { 5 | await page.goto('https://playwright.dev'); > 6 | await page.getByRole('button', { name: 'Get started' }).click(); | ^ 7 | await expect(page.getByRole('heading', { level: 3, name: 'Installation' })).toBeVisible(); 8 | }); You can see how Playwright internally generates these snippets. I’ve extracted the relevant code into a helper function, getCodeSnippet(), to retrieve the source code lines from the error stack trace: const snippet = getCodeSnippet(testInfo.error); ARIA Snapshot ARIA snapshots, introduced in Playwright 1.49, provide a structured view of the page’s accessibility tree. Here’s an example ARIA snapshot showing the navigation menu on the Playwright homepage: - document: - navigation "Main": - link "Playwright logo Playwright": - img "Playwright logo" - text: Playwright - link "Docs" - link "API" - button "Node.js" - link "Community" ... While ARIA snapshots are primarily used for snapshot comparison, they are also a game-changer for AI prompts in web testing. Compared to raw HTML, ARIA snapshots offer: Small size → Less risk of hitting prompt limitsLess noise → Less unnecessary contextRole-based structure → Encourages AI to generate role-based locators Playwright provides .ariaSnapshot(), which you can call on any element. For AI to fix a test, it makes sense to include the ARIA snapshot of the entire page retrieved from the root <html> element: HTML const ariaSnapshot = await page.locator('html').ariaSnapshot(); Assembling the Prompt Finally, combine all the pieces into one prompt: HTML const errorMessage = stripAnsiEscapes(testInfo.error.message); const snippet = getCodeSnippet(testInfo.error); const ariaSnapshot = await page.locator('html').ariaSnapshot(); const prompt = promptTemplate .replace('{title}', testInfo.title) .replace('{error}', errorMessage) .replace('{snippet}', snippet) .replace('{ariaSnapshot}', ariaSnapshot); Example of the generated prompt: Step 3: Attach the Prompt to the Report When the prompt is built, you can attach it to the test using testInfo.attach: HTML export async function attachAIFix(page: Page, testInfo: TestInfo) { const willRetry = testInfo.retry < testInfo.project.retries if (testInfo.error && !willRetry) { const prompt = generatePrompt({ title: testInfo.title, error: testInfo.error, ariaSnapshot: await page.locator('html').ariaSnapshot(), }); await testInfo.attach('AI Fix: Copy below prompt and paste to Github Copilot Edits to see the magic', { body: prompt }) } } Now, whenever a test fails, the HTML report will include an attachment labeled "Fix with AI." Fix Using Copilot Edits When it comes to using ChatGPT for fixing tests, you typically have to manually implement the suggested changes. However, you can make this process much more efficient by using Copilot. Instead of pasting the prompt into ChatGPT, simply open the Copilot edits window in VS Code and paste your prompt there. Copilot will then recommend code changes that you can quickly review and apply — all from within your editor. Check out this demo video of fixing a test with Copilot in VS Code: Integrating "Fix with AI" into Your Project Vitaliy Potapov created a fully working GitHub repository demonstrating the "Fix with AI" workflow. Feel free to explore it, run tests, check out the generated prompts, and fix errors with AI help. To integrate the "Fix with AI" flow into your own project, follow these steps: Ensure you’re on Playwright 1.49 or newerCopy the fix-with-ai.ts file into your test directoryRegister the AI-attachment fixture: HTML import { test as base } from '@playwright/test'; import { attachAIFix } from '../../ai/fix-with-ai' export const test = base.extend({ fixWithAI: [async ({ page }, use, testInfo) => { await use() await attachAIFix(page, testInfo) }, { scope: 'test', auto: true }] }); Run your tests and open the HTML report to see the “Fix with AI” attachment under any failed test From there, simply copy and paste the prompt into ChatGPT or GitHub Copilot, or use Copilot’s edits mode to automatically apply the code changes. Relevant Links Fully-working GitHub repositoryOriginally written by Vitaliy Potapov: https://dev.to/vitalets/fix-with-ai-button-in-playwright-html-report-2j37 I’d love to hear your thoughts or prompt suggestions for making the “Fix with AI” process even more seamless. Feel free to share your feedback in the comments. Thanks for reading, and happy testing with AI!
Effective error management is paramount to the success of any continuous integration and continuous delivery (CI/CD) pipeline. Jenkins, being a widely adopted tool for automating software deployment workflows, introduces numerous complexities when managing errors across its stages. Centralized error codes, coupled with detailed error descriptions and a structured troubleshooting guide, significantly enhance the efficiency of identifying, resolving, and mitigating issues. This article explores the implementation of a centralized error code system and integrates a fail-fast strategy to enable rapid issue detection and resolution. The goal is to provide a robust framework for organizations to ensure streamlined Jenkins pipeline operations and mitigate delays caused by errors in critical pipeline stages. Introduction As modern software delivery pipelines become more intricate, the role of error management within Jenkins CI/CD workflows cannot be overstated. When an error occurs, especially in a complex, multi-stage pipeline, identifying and resolving it swiftly is imperative to avoid downstream failures and costly delays. Implementing a centralized error code system ensures consistency across error reporting and facilitates quicker resolution. Coupled with a fail-fast strategy, this methodology enables organizations to address issues early, preventing them from propagating through the pipeline and ensuring minimal disruption to the development lifecycle. The combination of centralized error codes with a fail-fast strategy is designed to accelerate debugging and minimize the impact of failures across Jenkins jobs, such as build, test, and deployment. This approach enhances the resilience and scalability of the pipeline, ensuring that critical stages are not overlooked and that errors are detected at the earliest possible point. Centralized Error Codes in Jenkins CI/CD Pipelines A centralized error code system involves standardizing error codes that provide concise, meaningful feedback for each failure within Jenkins pipelines. This method minimizes ambiguity and reduces troubleshooting time, ensuring that both developers and DevOps engineers can quickly address issues. Key Advantages of Centralized Error Codes Consistency in reporting: Centralized error codes ensure uniformity in how errors are identified and addressed across all Jenkins stages. This promotes a streamlined approach to managing failure scenarios, especially in larger, distributed teams.Efficiency in troubleshooting: Standardized error codes allow for fast diagnostics. When an error occurs, the error code leads directly to relevant documentation that outlines causes and resolutions, eliminating the need to sift through verbose logs.Enhanced communication: By utilizing a common language of error codes, different stakeholders (e.g., developers, QA engineers, DevOps teams) can communicate more effectively regarding failure scenarios.Scalability: As Jenkins pipelines grow in complexity, error code centralization scales, ensuring that error management remains consistent and effective across an expanding set of jobs and stages. Structuring Centralized Error Codes The error code system should be structured with consistency and scalability in mind. Below is an example schema for categorizing errors based on pipeline stages: Prefix: The prefix (e.g., BUILD, DEPLOY, TEST) represents the stage where the error occurred.Numeric identifier: The numeric identifier (e.g., 001, 404, 500) uniquely distinguishes each error within its category.Severity: A severity level is assigned (e.g., CRITICAL, WARNING, ERROR) to help prioritize responses. Example Error Code Table Error Code Error Message Root Cause Resolution Steps Severity Impact BUILD-001 Build failed due to missing dependency Dependency not found in the package repository Check the package.json file to ensure the dependency is included Critical Build fails DEPLOY-404 Deployment failed due to missing Docker image Docker image not found in registry Ensure the Docker image exists in the registry, and rebuild if necessary Critical Deployment halted TEST-500 Test execution error Unhandled exception in test suite Review unit test logs for exception details and update the test suite Error Test suite fails BUILD-103 Code linting error Code violates linting rules Resolve linting issues by adhering to the coding standards Warning Build warning Fail-Fast Strategy in Jenkins Pipelines The fail-fast strategy emphasizes halting the pipeline immediately when an error is detected, thereby preventing the error from propagating through subsequent stages. This approach enhances pipeline efficiency by minimizing resource usage and providing faster feedback to developers. Benefits of Fail-Fast in Jenkins Pipelines Early detection of failures: By detecting issues early in the pipeline, fail-fast ensures that problems are identified before they affect later stages, such as deployment or production integration.Prevention of cascading failures: A fail-fast approach prevents downstream failures by halting the pipeline immediately after the first error is detected, ensuring that subsequent stages do not run in an erroneous environment.Optimal resource allocation: It saves computational resources by avoiding the execution of unnecessary steps when an issue is already detected, improving the overall efficiency of the CI/CD pipeline.Faster feedback for developers: With rapid feedback on failures, developers can address issues promptly, improving the velocity of development. Implementing Fail-Fast in Jenkins Scripted Pipelines Jenkins pipelines, whether declarative or scripted, support mechanisms for fail-fast behavior. This functionality can be configured programmatically in scripted pipelines using error-handling techniques. Example of Fail-Fast in Jenkins Scripted Pipeline In a scripted pipeline, you can handle fail-fast behavior by using a try-catch block to catch errors and immediately stop the pipeline when a failure occurs. Groovy node { try { stage('Build') { echo 'Building project...' sh 'npm install' // Trigger the build command } stage('Test') { echo 'Running tests...' sh 'npm test' // Execute unit tests } stage('Deploy') { echo 'Deploying to environment...' sh 'deploy.sh' // Execute deployment command } } catch (Exception e) { // Catch any exception, set the build result as failure, and halt the pipeline currentBuild.result = 'FAILURE' echo "Pipeline failed due to: ${e.getMessage()}" throw e // Fail the pipeline immediately } } In the above-scripted pipeline, the try-catch block ensures that as soon as an error occurs in any stage (e.g., Build, Test, Deploy), the pipeline will immediately terminate. The throw command ensures that the pipeline exits as soon as an error is caught. Detailed Handling for Fail-Fast Implementation Additionally, you can implement specific fail-fast logic for each stage, where different steps may trigger their own failure conditions, depending on the severity. Groovy node { try { stage('Build') { echo 'Building project...' sh 'npm install' } stage('Test') { echo 'Running tests...' sh 'npm test' } stage('Deploy') { echo 'Deploying to environment...' sh 'deploy.sh' } } catch (Exception e) { if (e.getMessage().contains('Build failed')) { currentBuild.result = 'FAILURE' echo 'Critical error during build, aborting pipeline.' throw e } else { currentBuild.result = 'UNSTABLE' echo 'Non-critical error, proceeding with remaining stages.' } } } In this example, the pipeline ensures that if the Build stage fails, it immediately stops the pipeline. However, if an issue occurs during the Test or Deploy stages that is non-critical, the pipeline will mark the build as UNSTABLE and continue to completion, though with a warning. Troubleshooting Guide for Jenkins Pipelines A structured troubleshooting guide is essential for minimizing downtime when an error occurs in Jenkins. With centralized error codes, the troubleshooting process becomes more efficient by providing clear instructions on how to resolve the identified issues. Troubleshooting Steps Using Centralized Error Codes Identify the error code: In Jenkins, the error code is typically output along with the failure message in the build logs. This allows users to quickly locate the failure point.Reference the error description: Once the error code is identified, the corresponding error message and resolution steps should be referenced in the error code documentation to understand the underlying issue.Prioritize resolution: Based on the severity of the error code (e.g., CRITICAL, ERROR), prioritize addressing high-impact errors that halt the pipeline execution.Review build logs: Examine the Jenkins build logs for further information regarding the failure. The logs may contain specific details, such as missing dependencies, invalid configurations, or failed commands.Apply resolution steps: Follow the provided resolution steps associated with the error code. Common resolutions might include fixing configuration files, updating dependencies, or modifying pipeline scripts.Test the fix: After applying the resolution, trigger a re-run of the Jenkins pipeline to ensure the issue is resolved and the pipeline executes successfully.Update documentation: After troubleshooting, if any insights or modifications to the error code system were discovered, update the centralized error code documentation to reflect these changes. Conclusion In modern Jenkins CI/CD pipelines, centralized error codes and a fail-fast strategy are indispensable for minimizing downtime and accelerating the resolution of issues. By establishing a clear, standardized error reporting framework, teams can improve their ability to diagnose and address failures efficiently. The integration of fail-fast principles ensures that Jenkins pipelines operate at peak efficiency, detecting issues early and halting execution before failures cascade through the pipeline. Through these practices, organizations can maintain robust, scalable, and efficient CI/CD workflows, improving both the developer experience and overall pipeline performance. Implementing a fail-fast strategy with centralized error codes and detailed troubleshooting guides establishes a best-practice framework that enhances the resilience and efficiency of Jenkins CI/CD pipelines, enabling organizations to manage software delivery with greater precision and agility.
Code exploration for large legacy codebases is a heavy-lifting task. Manual exploration can become error-prone and time-consuming. Automated data collection and visualization can ease the process to some extent. To extract key insights like Code composition, LoC, etc., we may need to use various data collection tools. However, using those tools is challenging as most of them are commercial. The available FOSS tools either support only smaller code sizes or only support a limited set of technology stacks. One such tool is Doxygen, which generates documentation out of codebases and helps extract various metadata elements that can be processed and used for further exploration. However, the challenge with this tool is that it allows very little control over how it collects data and is very heavy to run on large code bases. To solve this problem, we tried to build a custom data collection process to collect call graphs from codebases. The core component of the tool is a parser that parses source code, builds a call graph, and stores it in a graph datastore. This tutorial will guide you through setting up a tree-sitter parsing library in Python and using its different API for parsing the code for various use cases. Introduction Tree-sitter is a very powerful and performant parser generator library implemented in C and optimized to run cross-platforms. It supports grammar for most of the popular high-level programming languages. It also supports bindings for multiple languages so that it can be integrated with any type of application. For our implementation, we have used the C family of parser and Python bindings. Setup and Installation To get started with tree-sitter in Python, it needs the below package installation: Base Package Plain Text pip install tree-sitter This package provides an abstract class for the implementation of specific languages: Language: A class that defines how to parse a particular language.Tree: A tree that represents the syntactic structure of a source code file.Node: A single node within a syntax Tree.Parser: A class that is used to produce a Tree based on some source code. Language Packages This tutorial is focused on parsing codebases written in C family languages, so for this, it would require the below packages to be installed using the given commands: Plain Text pip install tree-sitter-c pip install tree-sitter-cpp pip install tree-sitter-c-sharp Each of these packages provides a language and parser implementation that can be used to parse code written in the specific language. Getting Started Basic parsing requires a parser instance for each language, which follows an abstract API. Python from tree_sitter import Language, Parser import tree_sitter_c as tsc import tree_sitter_cpp as tscpp import tree_sitter_c_sharp as tscs parser_c = Parser(Language(tsc.language())) parser_cpp = Parser(Language(tscpp.language())) parser_cs = Parser(Language(tscs.language())) To parse a code, it needs to read the file and load it to bytes. Python def readFile(file_path): with open(file_path, 'r', encoding = 'utf-8') as file: file_str = file.read() file_bytes = bytes(file_str, "utf8") return file_bytes Then, the loaded bytes are passed to the parse method, which will create and return a tree object representing the Abstract Syntex Tree of the parsed source code. Python file_bytes = readFile('C:/Data/RnD/memgraph-demo/alternative.c') tree = parser_c.parse(file_bytes) print("tree:- ", tree) The Tree points to the root node that has children created according to the grammar rules of that parser. Traversing the Parsed Tree The tree can be traversed using multiple parser APIs: Traversing Using Children The simplest way to traverse the tree is using direct children of each node. Each node has a name and type associated with it. The tree does not contain value embedded into it; to retrieve the value of each node it needs to offset the source using the start and end bytes of the node. Python def node_val(source_byte, node): return source_byte[node.start_byte:node.end_byte].decode('utf8') For example, to retrieve all the member function names in a C file, it needs to first reach each function_definition node type and then traverse to its function_declarator and finally, to its identifier node. Python def print_functions_c(file_bytes, tree): root_children = tree.root_node.children for root_child in root_children: if(root_child.type == "function_definition"): func_def_children = root_child.children for func_def_child in func_def_children: if(func_def_child.type == 'function_declarator'): func_dec_children = func_def_child.children for func_dec_child in func_dec_children: if(func_dec_child.type == 'identifier'): identifier = node_val(file_bytes, func_dec_child) print(identifier) Traversing Using Recursion The above code can be optimized by simply traversing the nodes recursively. By skipping all the intermediate children till reaching the final ‘identifier’ node. Python def print_identifiers(node, file_bytes): if node.type == 'identifier': identifier = node_val(file_bytes, node) print('identifier', ":-", identifier ) for child in node.children: print_identifiers(child, file_bytes) def print_functions_c(file_bytes, tree): print_identifiers(tree.root_node.children, file_bytes) Traversing Using Cursor API Parser’s tree provides a very efficient Cursor API that keeps track of nodes being processed. Based on logic, it can choose to process the next, previous, parent, or child node: cursor = tree.walk()cursor.goto_first_child()cursor.goto_next_sibling()cursor.goto_parent() To traverse using the cursor, you can use it inside recursion to reach a particular node by skipping all non-necessary nodes. Python def print_fn_defs_cs(file_bytes, cursor): if(cursor.node.type == "method_declaration"): identifier = cursor.node.child_by_field_name('nme') fn_name = node_val(file_bytes, identifier) print("fn_name: ", fn_name) if(len(cursor.node.children) > 0): status = cursor.goto_first_child() else: status = cursor.goto_next_sibling() while(status == False): status = cursor.goto_parent() if(status == False): break status = cursor.goto_next_sibling() if(status == True): print_fn_defs_cs(file_bytes, cursor) Building the Call Graph Using one of the above techniques, it can traverse and extract the function definition and function calls with a source file. Further, it needs to push the extracted data to a graph structure so that it can build relationships between method definition and method call across source files within a large codebase. This tutorial is focused on parsing and retrieving relationships between functions. The next part of this tutorial will focus on how to store and visualize the call graphs using graph stores like neo4j/memgraph.
When developers first think about managing data or choosing a database for their application, the first thing that often comes to their mind is — — that’s right, a table. They then set up different tables for the different types of data, like having one table for users and another for products, orders, and so on. With data spread across different tables, they are bound to have a relationship with each other. Therefore, the tables will have references to the items in other tables via a foreign key. And finally, they may end up choosing a relational database every time to solve their problems. Especially when dealing with relations, the idea of a non-relational database might seem very unintuitive. But the problem is — data isn’t always flat. Data requirements can be complex, and when developers visualize their data without considering databases, it's often not tabular. Instead, it may appear nested, hierarchical, or even graph-like. Graph, nested, and hierarchical data You need not imagine a table every time you start to work with data. In modern applications, data often need to be of flexible schema with complex data types. Hence, flattening it into a table may not always be the best idea. On top of that, relational databases also come with their own scaling limitations. As you scale, you may need to worry about sharding, which often requires significant application-level changes. Enter non-relational databases, or more commonly known as NoSQL. You may be missing out on extracting the full potential of NoSQL if you have been shunning them in your design choices. In this article, we quickly define what non-relational databases are and then provide different NoSQL database choices for different real-life problems. Towards the end, we will also cover what advantages NoSQL databases provide over relational ones, and also touch upon scenarios where a relational database is still a better choice. Non Relational Databases, or NoSQL Simply put, these databases store data in a non-tabular format. They come in different types, each built for specific purposes and the kind of non-structured data they can support. The most common ones are: Document databases: Store data in JSON-like documentsKey-value databases: Store data in with 'key-value' structure for unique keys.Graph databases: Uses nodes, edges, and properties to represent and store relationships between data points. Other types are Column-oriented, in-memory, and time-series databases. Different NoSQL database providers offer features such as creating indexes on certain fields, developer-friendly SDKs, high availability, transactional operations, and the ability to easily scale your database up or down. Solving Real-Life Problems With NoSQL Let’s see some examples where we solve real-life problems with NoSQL databases. Building a Blog Application that Supports Comments and Tags Document databases are best for such content management systems. Each content can be a separate document. The schema is flexible, and each document can contain sub-collections. You can leverage the array-field indexing capabilities of the document database providers to perform complex queries, for, e.g., fetching all blogs with their tags. Social and Professional Networking Platforms Graph databases help define complex relationships between people. Easily find friends of friends via graph searches. IoT Sensor Data Use time-series data to handle massive amounts of timestamped data from a sensor device. Shopping Cart Use a Key-value pair database, which offers high concurrency with quick reads and writes. You can also use key-value databases to store users' session information, and the shopping cart can be a field within the session (for an individual shopping session). Recommendation Engines Use graph databases to map relationships between users, products, and preferences. Where NoSQL Outperforms Relational Databases Below are some of the pointers where NoSQL databases provide advantages over relational databases: Developer productivity: NoSQL database providers offer APIs and SDKs tailored for modern application development, enabling faster prototyping and development.Scalability: NoSQL databases scale horizontally (where you simply need to add more servers or nodes), which is cost-effective and easier to manage compared to vertical scaling required by relational databases.Flexibility: NoSQL offers a flexible schema, which is perfect for agile development and evolving application requirements.Handling nested data: Oftentimes, data is nested or hierarchical, like in JSON or XML formats. NoSQL databases can support these formats, where a complex transformation may be required when using relational databases for these. Where Relational Databases Still Hold an Edge Below are some of the use cases where relational databases remain relevant and are a good idea. Analytics and reporting: Relational databases are optimized for ad-hoc queries, aggregations, and reporting. For any analytical and reporting use cases, relational databases are still a go-to choice.Heavy use of relations and joins: Although you can still manage and work with relations with NoSQL databases, if your application is very heavily dependent on joins across different dataset, relational databases can still a good idea.Consistency over availability: In scenarios where data consistency is more important than availability (e.g., financial transactions), relational databases can provide more guarantee. And, this concludes our article. We explored what NoSQL databases are and how they address real-life challenges in data management. NoSQL databases open up a world of possibilities for modern applications, from their flexibility in handling complex data structures to the powerful features offered by providers. For those who haven’t yet tried their hand at NoSQL, I hope this inspires you to give it a shot in your next project — you might just find it to be the perfect fit for your needs.
Cold emailing remains one of the most effective ways to reach potential employers or clients, but crafting personalized, compelling messages at scale can be challenging. CrewAI is a framework for creating AI agent teams to automate and enhance cold email outreach. In this tutorial, we'll build a sophisticated cold email system using CrewAI that researches companies, generates personalized templates, and provides strategic insights. The Challenge With Traditional Cold Emailing Traditional cold emailing faces several challenges: Time-consuming research for each companyDifficulty maintaining personalization at scaleInconsistent messaging and value propositionsLimited ability to analyze and improve performance Our CrewAI-powered system addresses these challenges by creating a crew of specialized AI agents who work together to craft effective cold emails. Setting Up the Project First, let's set up our project structure: cold_emailer/ ├── config/ │ ├── agents.yaml │ └── tasks.yaml ├── cold_emailer_agent/ │ ├── __init__.py │ └── crew.py └── main.py Install the required dependencies: pip install crewai crewai-tools Defining Our AI Agents Our system uses three specialized agents, each with a specific role: Email researcher: Investigates companies and identifies personalization opportunitiesEmail strategist: Crafts compelling email templates based on researchOutreach analyst: Analyzes templates and suggests improvements Here's how we configure our agents in agents.yaml: YAML email_researcher: role: > Cold Email Research Specialist for {industry} goal: > Research companies and identify personalized connection points for cold emails backstory: > You're an expert at finding meaningful insights about companies and their pain points. You combine public information, technical analysis, and industry trends to identify compelling conversation starters for cold emails. # ... [similar configurations for email_strategist and outreach_analyst] Creating Tasks for Our Agents Each agent needs specific tasks to complete. We define these in tasks.yaml: YAML research_task: description: > Research {company_name} to identify: 1. Recent company news, tech stack changes, or public challenges 2. Specific technical improvement opportunities 3. Relevant projects or innovations they might be interested in 4. Key decision makers and their priorities expected_output: > A detailed research report with specific insights that can be used to personalize cold emails agent: email_researcher # ... [similar configurations for strategy_task and analysis_task] Implementing the CrewAI System The heart of our system is the ColdEmailCrew class. This orchestrates our agents and tasks: YAML @CrewBase class ColdEmailCrew: """Crew for generating personalized cold emails""" agents_config = 'config/agents.yaml' tasks_config = 'config/tasks.yaml' @agent def email_researcher(self) -> Agent: """Create the research specialist agent""" return Agent( config=self.agents_config['email_researcher'], verbose=True, tools=[SerperDevTool(), SeleniumScrapingTool()] ) # ... [similar methods for email_strategist and outreach_analyst] @crew def crew(self) -> Crew: """Creates the ColdEmailCrew""" return Crew( agents=self.agents, tasks=self.tasks, process=Process.sequential, verbose=True ) Running the System To use our cold email system: YAML from cold_emailer_agent.crew import ColdEmailCrew def run(): """Run the crew with example inputs""" inputs = { "industry": "tech", "company_name": "Google" } # Create and run the crew ColdEmailCrew().crew().kickoff(inputs=inputs) if __name__ == "__main__": run() Example Output When we run our system targeting Google in the tech industry, it generates: Research insights about Google's tech stack and infrastructureA personalized email template with multiple subject line variationsDetailed analysis with A/B testing suggestions The email template includes personalization opportunities: Subject Line: Improving Google's Tech Stack: Insights from Industry Experts Hi [Recipient], I came across your work on improving Google's tech stack, and I wanted to share some insights that might be relevant to your team. As we've analyzed Google's infrastructure, we noticed that they're using a combination of open-source technologies like Kubernetes, TensorFlow, and Apache Beam. While this is impressive, there are potential areas for improvement to enhance scalability and efficiency. [Rest of template...] Analysis and Improvements The system also provides a detailed analysis of the generated template: Personalization effectiveness score: 7/10Value proposition clarity: 8/10Specific improvement recommendationsA/B testing scenarios for optimization Future Enhancements Potential improvements to the system could include: Integration with email delivery systemsAdvanced analytics trackingMachine learning for response predictionDynamic template adjustment based on feedback Conclusion Combining specialized AI agents for research, strategy, and analysis can create more effective, personalized cold emails at scale. The system demonstrates how AI can augment human capabilities in business communication while maintaining authenticity and relevance. Try implementing this system in your outreach efforts and see how it can transform your cold email process. Test and refine the output to match your specific needs and voice.
With a traditional lexical-based (or keyword-based) search, we will find documents that contain the exact word we searched for. Keyword search excels at precision but struggles with alternate phrasing or natural language. Semantic search addresses these limitations by capturing the intent behind documents and user queries. This is typically done by leveraging vector embeddings to map documents and queries into a high dimensional space and computing vector similarity to retrieve relevant results. For several systems, a single search method may fall short, resulting in incomplete information being shown to users. Combining the strengths of both search methods described above would allow us to deliver an effective search experience. Keyword-based search is supported well by systems like Elasticsearch and Apache Solr. Semantic search typically requires storage with a vector database and there exists a wide range of solutions. This post explains how we can support hybrid search involving both lexical and semantic search using a single and familiar storage system in Postgres. Let’s suppose we have the following table used by an application that allows users to search for products via keyword or natural language: SQL CREATE TABLE products ( id bigserial PRIMARY KEY, description VARCHAR(255), embedding vector(384) ); The description column contains a text/natural language description of the product. Postgres provides a default index on this column for full-text search, but we can also create a custom index to accelerate full-text search, which acts like an index for information retrieval. The embedding column stores vector (float) representations of product descriptions, capturing semantic meaning rather than words. The pgvector extension in Postgres brings with it the vector data type and vector similarity metrics — L2, cosine, and dot product distances. There are several ways of generating embeddings, for e.g., using word-level embeddings such as Word2Vec, Sentence/Document embeddings such as SBERT, or embeddings from transformer-based models such as the BERT model. For demonstration, we will insert the following data into the database: SQL INSERT INTO products (description) VALUES ('Organic Cotton Baby Onesie - Newborn Size, Blue'), ('Soft Crib Sheet for Newborn, Hypoallergenic'), ('Baby Monitor with Night Vision and Two-Way Audio'), ('Diaper Bag Backpack with Changing Pad - Unisex Design'), ('Stroller for Infants and Toddlers, Lightweight'), ('Car Seat for Newborn, Rear-Facing, Extra Safe'), ('Baby Food Maker, Steamer and Blender Combo'), ('Toddler Sippy Cup, Spill-Proof, BPA-Free'), ('Educational Toys for 6-Month-Old Baby, Colorful Blocks'), ('Baby Clothes Set - 3 Pack, Cotton, 0-3 Months'), ('High Chair for Baby, Adjustable Height, Easy to Clean'), ('Baby Carrier Wrap, Ergonomic Design for Newborns'), ('Nursing Pillow for Breastfeeding, Machine Washable Cover'), ('Baby Bath Tub, Non-Slip, for Newborn and Infant'), ('Baby Skincare Products - Lotion, Shampoo, Wash - Organic'); For embeddings, I used a SentenceTransformer model (aka SBERT) to generate embeddings and then stored them in the database. The following Python code demonstrates this: SQL descriptions = [product[1] for product in products] model = SentenceTransformer("all-MiniLM-L6-v2") embeddings = model.encode(descriptions) # Update the database with embeddings for i, product in enumerate(products): product_id = product[0] embedding = embeddings[i] # Convert to Python list # Construct the vector string representation embedding_str = str(embedding.tolist()) cur.execute("UPDATE products SET embedding = %s WHERE id = %s", (embedding_str, product_id)) # Commit changes and close connection conn.commit() Full-Text Search Postgres provides extensive out-of-the-box support for keyword search. We can write a query like the following for keyword-based retrieval: Let’s say we want to search for sleep accessories for a baby. We might search with the following query: SQL SELECT id, description FROM products WHERE description @@ to_tsquery('english', 'crib | baby | bed'); This returns the following product back: SQL "Soft Crib Sheet for Newborn, Hypoallergenic" Note: ts_query searches for lexemes/normalized keywords, so replacing newborn with newborns or babies also returns the same result The above is, of course, a simple example, and Postgres’s full-text search functionality allows us several customizations, e.g., skip certain words, process synonyms, use sophisticated parsing, etc., by overriding the default text search config. Although these queries will work without an index, most applications find this approach too slow, except perhaps for occasional ad-hoc searches. Practical use of text searching usually requires creating an index. The following code demonstrates how we can create a GIN index (Generalized Inverted Index) on the description column and use it for efficient search. SQL --Create a tsvector column (you can add this to your existing table) ALTER TABLE products ADD COLUMN description_tsv tsvector; --Update the tsvector column with indexed data from the description column UPDATE products SET description_tsv = to_tsvector('english', description); -- Create a GIN index on the tsvector column CREATE INDEX idx_products_description_tsv ON products USING gin(description_tsv); Semantic Search Example Let’s now try to execute a semantic search request for our query intent — "baby sleeping accessories." To do this, we compute the embedding (as above) and pick the most similar products by vector distance (in this case, cosine distance). The following code demonstrates this: Python # The query string query_string = 'baby sleeping accessories' # Generate embedding for the query string query_embedding = model.encode(query_string).tolist() # Construct the SQL query using the cosine similarity operator (<->) # Assuming you have an index that supports cosine similarity (e.g., ivfflat with vector_cosine_ops) sql_query = """ SELECT id, description, (embedding <-> %s::vector) as similarity FROM products ORDER BY similarity LIMIT 5; """ # Execute the query cur.execute(sql_query, (query_embedding,)) # Fetch and print the results results = cur.fetchall() for result in results: product_id, description, similarity = result print(f"ID: {product_id}, Description: {description}, Similarity: {similarity}") cur.close() conn.close() This gives us the following results: Plain Text ID: 12, Description: Baby Carrier Wrap, Ergonomic Design for Newborns, Similarity: 0.9956936200879117 ID: 2, Description: Soft Crib Sheet for Newborn, Hypoallergenic, Similarity: 1.0233573590998544 ID: 5, Description: Stroller for Infants and Toddlers, Lightweight, Similarity: 1.078171715208051 ID: 6, Description: Car Seat for Newborn, Rear-Facing, Extra Safe, Similarity: 1.08259154868697 ID: 3, Description: Baby Monitor with Night Vision and Two-Way Audio, Similarity: 1.0902734271784085 Along with each result, we also get back its similarity (lower is better for cosine similarity). As we can see we get a richer set of results with embedding search which nicely augment the keyword based search. By default pgvector performs exact nearest neighbor search which guarantees perfect recall. However, this approach is quite expensive as the size of the dataset increases. We can add an index that trades off recall for speed. One example is the IVFFlat (Inverted File with Flat Compression) index in Postgres, which works by dividing the vector space into clusters using k-means clustering. During a search, it identifies the clusters closest to the query vector and performs a linear scan within those selected clusters, calculating the exact distances between the query vector and the vectors in those clusters. The following code defines how such an index can be created: SQL CREATE INDEX ON products USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); lists indicates the number of clusters to create. vector_cosine_ops indicates the distance metric we are using (cosine, inner product, or Euclidean/L2) Fusion of Results The two methods described above excel in different scenarios and complement each other. Combining the results of both methods would result in robust search results. Reciprocal Rank Fusion is a method for combining multiple result sets with different relevance indicators into a single result set. RRF requires no tuning, and the different relevance indicators do not have to be related to each other to achieve high-quality results. The core of RRF is captured in its formula: Mathematica RRF(d) = (r R) 1 / k + r(d)) Where: - d is a document - R is the set of rankers (retrievers) - k is a constant (typically 60) - r(d) is the rank of document d in ranker r In our example, we’d do the following: Calculate the rank of each product in each result set by taking the inverse of its rank after adding a constant. This constant prevents top-ranked products from dominating the final score and allows lower-ranked products to contribute meaningfully.Sum rank reciprocals from all result sets to get the final RRF score of a product. For keyword search, Postgres provides a ranking function ts_rank (and some variants) which can be used as the rank of a product inside the result set. For semantic search, we can use the embedding distance to calculate the rank of a product in the result set. It can be implemented in SQL using CTEs for each search method and combining them at the end. Further, we could also use an ML model to rerank the results after combining. Due to its high computational cost, ML model-based reranking is applied after the initial retrieval, which reduces the result set to a small set of promising candidates. Conclusion With the components described above, we built an intelligent search pipeline that integrates: Full-text search for precise keyword matchingVector search for semantic matchingResult fusion for combining results and reranking using ML We accomplished this by using a single database system, where all the data is stored. By avoiding integration with separate search engines or databases, we eliminated the need for having multiple tech stacks and reduced system complexity.
See previous Part 1 and Part 2. The relationship between your event definitions and the event streams themselves is a major design. One of the most common questions I get is, “Is it okay to put multiple event types in one stream? Or should we publish each event type to its own stream?” This article explores the factors that contribute to answering these questions and offers a set of recommendations that should help you find the best answer for your own use cases. Example Consumer Use Cases General alerting of state changes (deltas)Processing sequences of events (deltas)Transferring state (facts)Mixing facts and deltas The consumer’s use case should be a top consideration when deciding how to structure your event streams. Event streams are replayable sources of data that are written only once, but that can be read many times by many different consumers. We want to make it as easy as possible for them to use the data according to their own needs. Use Case: General Alerting of Changes Deltas work very well for the general change alerting. Applications can respond to the delta events exposed from inside of an application. Splitting up events so that there is only one type per stream provides a high granularity and permits consumer applications to subscribe to only the deltas they care about. Use Case: Processing Sequences of Delta Events But what if a single application needs to read several deltas, and the ordering between events is very important? Following a one-event per stream strategy introduces the risk that events may be read and processed out of order, giving inconsistent sequencing results. Stream processors, like Kafka Streams and Flink, typically contain logic to process events in both ascending timestamp and offset order, a process that I call “event scheduling.” For example, Kafka Streams uses a best-effort algorithm to select the next record to process, while Flink uses a Watermarking strategy to process records based on timestamps. Be warned that not all stream processing frameworks support event scheduling, leading to wildly diverging processing orders based on race condition outcomes. At the end of the day, even event scheduling is a best-effort attempt. Out-of-order processing may still occur due to intermittent failures of the application or hardware, as well as unresolved corner cases and race conditions in the frameworks themselves. Note that this is not nearly as dire as it seems. Many (if not the vast majority) of streaming use cases aren’t that sensitive to processing order between topics. For those that are sensitive, Watermarking and event scheduling tend to work pretty well in the majority of cases. And for those event sequences that need perfectly strict ordering? Well, read on. What About Strict Ordering? But what do you do if you need something with stronger guarantees? A precise and strict ordering of events may be a significant factor for your business use case. In this case, you may be better off putting all of your events into a single event stream so that your consumer receives them in the same order as they are written. You also need a consistent partitioning strategy to ensure that all events of the same key go to the same partition, as Kafka only guarantees order on a per-partition basis. Note that this technique is not about reducing the number of topics you’re using — topics are relatively cheap, and you should choose to build your topics based on the data they’re carrying and the purposes they’re meant to serve — not to simply cut down on topic count. Apache Kafka is perfectly capable of handling thousands of topics without any problem. Single Stream, Multiple Delta Types Putting related event types into a single topic partition provides a strict incremental order for consumer processing, but it requires that all events be written by a single producer, as it needs strict control over the ordering of events. In this example, we have merged all of the adds, removes, and discount codes for the shopping cart into a single partition of a single event stream. Use Case: Processing Sequences of Delta Events Zooming back out, you can see a single consumer coupled with this stream of events. They must be able to understand and interpret each of the types in the stream. It’s important not to turn your topic into a dumping ground for multiple event types and expect your consumers to simply figure it out. Rather, the consumer must know how to process each delta type, and any new types or changes to existing types would need to be negotiated between the application owners. Use Flink SQL to Split Stream Up You can also use a stream processor like Flink to split the single cart events stream up into an event stream per delta, writing each event to a new topic. Consumers can choose to subscribe to these purpose-built delta streams, or they can subscribe to the original stream and simply filter out events they do not care about. Word of Caution A word of caution, however. This pattern can result in a very strong coupling between the producer and the consumer service. Usually, it is only suitable for applications that are intended to be strongly coupled, such as a pair of systems using event sourcing, and not for general-purpose usage. You should also ask yourself if these two applications merit separation or if they should be redesigned into a single application. Use Case: Transferring State with Facts Facts provide a capability known as Event-Carried State Transfer. Each event provides a complete view of the state of a particular entity at a particular point in time. Fact events present a much better option for transferring state, do not require the consumer to interpret a sequence of events, and offer a much looser coupling option. In this case, only a single event type is used per event stream — there is no mixing of facts from various streams. Keeping only one fact type per stream makes it much easier to transfer read-only state to any application that needs access to it. Streams of Facts effectively act as data building blocks for you to compose purpose-built applications and services for solving your business problems. Single Fact Type Per Stream The convention of one type of fact per stream shows up again when you look into the tools you can build your applications with — like Kafka Streams or Flink. In this example, a Flink SQL application materializes the item facts into a table. The query specifies the table schema, the Kafka topic source, the key column, and the key and value schema format. Flink SQL enforces a strict schema definition and will throw away incoming events that do not adhere to it. This is identical to how a relational database will throw an exception if you try to insert data that doesn’t meet the table schema requirements. Joining Disparate Fact Streams You can leverage Flink SQL’s join functionality when consuming multiple types of facts from different streams, selecting only the fields you need for your own business logic and discarding the rest. In this example, the Flink SQL application consumes from both inventory and item facts and selects just the ID, price, name, and stock, but only keeps records where there is at least one item in stock. The data is filtered and joined together, then emitted to the in-stock items facts stream, which can be used by any application that needs it. Best Practice: Record the Entire State in One Fact When recording an event, it’s important to keep everything that happened in a single detailed event. Consider an order (above) that consists of both a cart entity and a user entity. When creating the order event, we insert all of the cart information as well as all of the user information for the user at that point in time. We record the event as a single atomic message to maintain an accurate representation of what happened. We do not split it up into multiple events in several other topics! Consumers are free to select only the data they really want from the event, plus you can always split up the compound event. However, it is far more difficult to reconstruct the original event if you split it up from the beginning. A best practice is to give the initial event a unique ID, and then propagate it down to any derivative events. This provides event tracing. We will cover event IDs in more detail in a future post. Use Case: Mixing Facts and Deltas Consumers can also compose applications by selecting the fact streams that they need and combining them with selected deltas. This approach is best served by single types per event stream, as it allows for easy mixing of data according to each consumer's needs. Summary Single streams of single delta types make it easy for applications to respond to specific edge conditions, but they remain responsible for building up their own state and applying their own business logic. A single delta per stream can lead to difficulties when trying to create a perfect aggregation of states. It can also be challenging when trying to figure out which events need to be considered to build the aggregate. Put multiple event types in the same stream if you are concerned about strict ordering, such as building up an aggregate from a series of deltas. You’ll have all of the data necessary to compose the final state, and in precisely the same order that it was published by the producer. The downside is that you must consume and process each event, even if it’s one you don’t really care about. And finally, use a single event type for fact streams. This is identical to how you would store this information in a relational database, with a single well-defined schema per table. Your stream consumers can mix, match, and blend the fact events they need for their own use cases using tools like Kafka Streams or Flink. There’s one more part to go in this event design series. Stay tuned for the next one, where we’ll wrap it up, covering elements such as workflows, assorted best practices, and basics of schema evolution.
The Internet of Things (IoT) is transforming industries by enabling seamless communication between a wide array of devices, from simple sensors to complex industrial machines. Two of the most prominent protocols driving IoT systems are OPC-UA (Open Platform Communications - Unified Architecture) and MQTT (Message Queuing Telemetry Transport). Each protocol plays a vital role in facilitating data exchange, but their use cases and strengths vary significantly. This article delves into how these protocols work, their advantages, and how to implement them using Python for creating robust IoT solutions. Why OPC-UA and MQTT? OPC-UA: Built for Industrial Automation OPC-UA is a platform-independent, service-oriented protocol tailored to industrial environments. It provides: Complex data models: Unlike simpler telemetry protocols, OPC-UA supports hierarchical data structures, making it suitable for detailed machine-to-machine (M2M) communication.Security: Features like encryption, authentication, and data integrity make OPC-UA a secure choice for industrial automation.Interoperability: It ensures seamless communication between devices from different manufacturers by adhering to a standardized information model.Rich functionality: Beyond simple data exchange, OPC-UA supports subscriptions, event monitoring, and remote method invocation, making it ideal for SCADA systems, MES (Manufacturing Execution Systems), and IIoT (Industrial Internet of Things) applications. MQTT: Lightweight and Real-Time MQTT is a lightweight, publish-subscribe protocol designed for devices with constrained resources. It offers: Minimal overhead: MQTT uses a lightweight messaging format, making it highly efficient for low-bandwidth networks.Real-time communication: The publish-subscribe model allows clients to receive updates as soon as they are published.Scalability: With an MQTT broker at the center, it can support thousands of devices in large-scale IoT deployments.Flexibility: It’s a go-to protocol for telemetry and event-driven applications, such as smart homes, health monitoring, and connected vehicles. Architecture Overview OPC-UA In an OPC-UA system: OPC-UA servers: Devices or systems (e.g., sensors, PLCs, or SCADA systems) host data and expose it to clients.OPC-UA clients: Applications or systems (e.g., MES or analytics software) connect to servers to retrieve or subscribe to data.Secure communication: Built-in encryption and access control ensure safe data exchange. Source: Emqx.com MQTT In an MQTT-based architecture: Publishers: Devices (e.g., sensors, microcontrollers) publish data to an MQTT broker.Subscribers: Applications or services subscribe to topics of interest to receive updates.MQTT bBroker: Acts as the central hub, managing message distribution and ensuring scalability. Python Implementations 1. Setting Up an OPC-UA Server Here’s how to create a simple OPC-UA server to expose a temperature sensor value: Python from opcua import Server from datetime import datetime # Create an OPC-UA Server server = Server() # Set server endpoint server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/") # Add a namespace namespace = server.register_namespace("IoT_Example") # Create an object node node = server.nodes.objects.add_object(namespace, "IoTDevice") # Add a variable to the object temperature = node.add_variable(namespace, "Temperature", 0) temperature.set_writable() # Allow variable to be writable # Start the server server.start() print("OPC-UA Server is running at opc.tcp://0.0.0.0:4840/freeopcua/server/") try: while True: # Update the temperature value temperature.set_value(35.5) # Example value print(f"Temperature updated: {temperature.get_value()}") except KeyboardInterrupt: print("Shutting down server...") server.stop() 2. Setting Up an OPC-UA Client Here’s how to retrieve the temperature data from the OPC-UA server: Python from opcua import Client # Connect to the OPC-UA Server client = Client("opc.tcp://127.0.0.1:4840/freeopcua/server/") # Use the server's correct URL client.connect() print("Connected to the OPC-UA server.") try: # Browse the root and objects node root = client.get_root_node() objects = root.get_child(["0:Objects"]) # Get the IoTDevice node iot_device = client.get_node("ns=2;i=1") # Replace with correct Node ID for IoTDevice # Fetch the Temperature variable temperature_node = client.get_node("ns=2;i=2") # Use the correct Node ID found during browsing temperature_value = temperature_node.get_value() print(f"Current Temperature: {temperature_value}°C") finally: # Disconnect from the server client.disconnect() print("Disconnected from the server.") 3. Setting Up an MQTT Publisher Publish temperature sensor data to an MQTT broker using Python: Python import paho.mqtt.client as mqtt # MQTT Broker details broker = "test.mosquitto.org" port = 1883 topic = "iot/temperature" # Create an MQTT client with explicit callbacks def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT broker!") else: print(f"Connection failed with code {rc}") # Create and configure client client = mqtt.Client() client.on_connect = on_connect # Assign connect callback # Connect to the broker client.connect(broker, port) # Publish a message client.loop_start() # Start the network loop client.publish(topic, "Temperature: 15.5°C") print(f"Message published to topic '{topic}'") client.loop_stop() # Stop the loop 4. Setting Up an MQTT Subscriber Receive and display temperature data from the MQTT broker: Python import paho.mqtt.client as mqtt # MQTT Broker details broker = "test.mosquitto.org" port = 1883 topic = "iot/temperature" # Define the callback functions explicitly def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT broker and subscribed to topic.") client.subscribe(topic) else: print(f"Connection failed with code {rc}") def on_message(client, userdata, msg): print(f"Received message: {msg.payload.decode()} from topic: {msg.topic}") # Create an MQTT client and explicitly assign callbacks client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message # Connect to the broker client.connect(broker, port) # Start the network loop to listen for messages print("Listening for messages...") client.loop_forever() Conclusion OPC-UA and MQTT complement each other in IoT systems. OPC-UA provides rich, secure, and structured communication for industrial devices, while MQTT ensures lightweight, scalable data distribution for telemetry and cloud integration. By leveraging Python, you can seamlessly implement and integrate these protocols to build versatile IoT solutions. These Python examples offer a starting point for practical implementation. As IoT ecosystems grow more complex, combining OPC-UA and MQTT will unlock new opportunities for efficiency and innovation.
The Jenkins pipeline below automates the secure management of Kubernetes sealed secrets across multiple environments and clusters, including AKS (Non-Production), GKE (Production Cluster 1), and EKS (Production Cluster 2). It dynamically adapts based on the selected environment, processes secrets in parallel for scalability, and ensures secure storage of credentials and artifacts. With features like dynamic cluster mapping, parallel execution, and post-build artifact archiving, the pipeline is optimized for efficiency, security, and flexibility in a multi-cloud Kubernetes landscape. Key Features and Workflow Dynamic Cluster Selection Based on the ENVIRONMENT parameter, the pipeline dynamically determines the target clusters:Non-Production: Targets the AKS cluster using the Stage credential.Production: Targets both GKE and EKS clusters with Production_1 and Production_2 credentials respectively. Parallel Processing The Process Clusters stage executes cluster-specific workflows in parallel, significantly reducing runtime for multi-cluster operations. For example:In Production, the pipeline simultaneously processes GKE and EKS clusters.In Non-Production, only the AKS cluster is processed. Secure Sealed Secrets Workflow Decodes the Base64-encoded Secrets.yaml file.Fetches the public certificate from the Sealed Secrets controller.Encrypts the secrets for the respective cluster and namespace.Generates sealed-secrets.yaml artifacts. Dynamic and Reusable Pipeline The cluster list and credentials are dynamically configured, making the pipeline adaptable for additional clusters or environments with minimal changes. Post-Build Artifact Management Artifacts for each cluster, including sealed-secrets.yaml and metadata files (README.txt), are archived and made accessible in Jenkins UI for easy retrieval. Parallel Execution Logic The pipeline uses Groovy’s parallel directive to process clusters concurrently: Cluster Mapping The ENVIRONMENT parameter determines the cluster list:Non-Production: Includes only the AKS cluster.Production: Includes both GKE and EKS clusters. Parallel Stage Creation For each cluster: A separate parallel stage is defined dynamically with cluster-specific names, credentials, and directories.Each stage independently fetches certificates and generates sealed secrets. Execution The parallel block runs all stages concurrently, optimizing execution time. Scenario 1: Non-Production (AKS) Selected environment: Non-Production.The pipeline:Processes the AKS cluster only.Generates sealed secrets for AKS.Archives artifacts for the AKS environment. Scenario 2: Production (GKE and EKS) Selected environment: Production.The pipeline:Processes both GKE and EKS clusters simultaneously.Generates separate sealed secrets for each cluster.Archives artifacts for both GKE and EKS. Detailed Explanation of the Jenkins Pipeline Script This Jenkins pipeline script automates the process of managing Kubernetes sealed secrets in a multi-cloud environment consisting of AKS, GKE, and EKS clusters. Below is a detailed step-by-step explanation of how the script functions. Plain Text parameters { string(name: 'NAMESPACE', defaultValue: 'default', description: 'Kubernetes namespace for the sealed secret') choice( name: 'ENVIRONMENT', choices: ['Non-Production', 'Production'], description: 'Select the target environment' ) base64File(name: 'SECRETS_YAML', description: 'Upload Secrets.yaml file to apply to the cluster') booleanParam(name: 'STORE_CERT', defaultValue: true, description: 'Store the public certificate for future use') } NAMESPACE: Specifies the target namespace in Kubernetes where the sealed secrets will be applied.ENVIRONMENT: Determines whether the pipeline operates on Non-Production (AKS) or Production (GKE and EKS).SECRETS_YAML: Accepts the Base64-encoded YAML file containing the sensitive data to be sealed.STORE_CERT: A flag indicating whether the public certificate used for sealing secrets should be archived for future use. Environment Variables Plain Text environment { WORK_DIR = '/tmp/jenkins-k8s-apply' CONTROLLER_NAMESPACE = 'kube-system' CONTROLLER_NAME = 'sealed-secrets' CERT_FILE = 'sealed-secrets-cert.pem' DOCKER_IMAGE = 'docker-dind-kube-secret' ARTIFACTS_DIR = 'sealed-secrets-artifacts' } WORK_DIR: Temporary workspace for processing files during the pipeline execution.CONTROLLER_NAMESPACE and CONTROLLER_NAME: Define the location and name of the Sealed Secrets controller in the Kubernetes cluster.CERT_FILE: Name of the public certificate file used for sealing secrets.DOCKER_IMAGE: Docker image containing the necessary tools for processing secrets (e.g., kubeseal).ARTIFACTS_DIR: Directory where artifacts (sealed secrets and metadata) are stored. Environment Setup Plain Text stage('Environment Setup') { steps { script { echo "Selected Environment: ${params.ENVIRONMENT}" def clusters = [] if (params.ENVIRONMENT == 'Production') { clusters = [ [id: 'prod-cluster-1', name: 'Production Cluster 1', credentialId: 'Production_1'], [id: 'prod-cluster-2', name: 'Production Cluster 2', credentialId: 'Production_2'] ] } else { clusters = [ [id: 'non-prod-cluster', name: 'Non-Production Cluster', credentialId: 'Stage'] ] } env.CLUSTER_IDS = clusters.collect { it.id }.join(',') clusters.each { cluster -> env["CLUSTER_${cluster.id}_NAME"] = cluster.name env["CLUSTER_${cluster.id}_CRED"] = cluster.credentialId } echo "Number of target clusters: ${clusters.size()}" clusters.each { cluster -> echo "Cluster: ${cluster.name} (${cluster.id})" } } } } Defines the clusters based on the ENVIRONMENT parameter: Non-production: Targets only the AKS cluster.Production: Targets GKE and EKS clusters.Stores cluster information (IDs, names, and credentials) in environment variables for dynamic referencing. Prepare Workspace Plain Text stage('Prepare Workspace') { steps { script { sh """ mkdir -p ${WORK_DIR} mkdir -p ${WORKSPACE}/${ARTIFACTS_DIR} rm -f ${WORK_DIR}/* || true rm -rf ${WORKSPACE}/${ARTIFACTS_DIR}/* || true """ if (params.ENVIRONMENT == 'Non-Production') { sh "rm -rf ${WORKSPACE}/${ARTIFACTS_DIR}/prod-*" } else { sh "rm -rf ${WORKSPACE}/${ARTIFACTS_DIR}/non-prod-*" } if (params.SECRETS_YAML) { writeFile file: "${WORK_DIR}/secrets.yaml.b64", text: params.SECRETS_YAML sh """ base64 --decode < ${WORK_DIR}/secrets.yaml.b64 > ${WORK_DIR}/secrets.yaml """ } else { error "SECRETS_YAML parameter is not provided" } } } } Creates temporary directories for processing secrets and cleaning up old artifacts.Decodes the uploaded Base64 Secrets.yaml file and prepares it for processing. Process Clusters Plain Text stage('Process Clusters') { steps { script { def clusterIds = env.CLUSTER_IDS.split(',') def parallelStages = [:] clusterIds.each { clusterId -> def clusterName = env["CLUSTER_${clusterId}_NAME"] def credentialId = env["CLUSTER_${clusterId}_CRED"] parallelStages[clusterName] = { stage("Process ${clusterName}") { withCredentials([file(credentialsId: credentialId, variable: 'KUBECONFIG')]) { def clusterWorkDir = "${WORK_DIR}/${clusterId}" def clusterArtifactsDir = "${WORKSPACE}/${ARTIFACTS_DIR}/${clusterId}" sh """ mkdir -p ${clusterWorkDir} mkdir -p ${clusterArtifactsDir} cp ${WORK_DIR}/secrets.yaml ${clusterWorkDir}/ """ sh """ docker run --rm \ -v \${KUBECONFIG}:/tmp/kubeconfig \ -v ${clusterWorkDir}/secrets.yaml:/tmp/secrets.yaml \ -e KUBECONFIG=/tmp/kubeconfig \ --name dind-service-${clusterId} \ ${DOCKER_IMAGE} kubeseal \ --controller-name=${CONTROLLER_NAME} \ --controller-namespace=${CONTROLLER_NAMESPACE} \ --kubeconfig=/tmp/kubeconfig \ --fetch-cert > ${clusterWorkDir}/${CERT_FILE} """ sh """ docker run --rm \ -v \${KUBECONFIG}:/tmp/kubeconfig \ -v ${clusterWorkDir}/secrets.yaml:/tmp/secrets.yaml \ -v ${clusterWorkDir}/${CERT_FILE}:/tmp/${CERT_FILE} \ -e KUBECONFIG=/tmp/kubeconfig \ --name dind-service-${clusterId} \ ${DOCKER_IMAGE} sh -c "kubeseal \ --controller-name=${CONTROLLER_NAME} \ --controller-namespace=${CONTROLLER_NAMESPACE} \ --format yaml \ --cert /tmp/${CERT_FILE} \ --namespace=${params.NAMESPACE} \ < /tmp/secrets.yaml" > ${clusterArtifactsDir}/sealed-secrets.yaml """ sh """ echo "Generated on: \$(date)" > ${clusterArtifactsDir}/README.txt echo "Cluster: ${clusterName}" >> ${clusterArtifactsDir}/README.txt """ } } } } parallel parallelStages } } } Dynamically creates parallel stages for each cluster:Fetches cluster-specific certificates using kubeseal.Encrypts the secrets for the target namespace.Executes all cluster stages concurrently to optimize time. Post-Build Actions Plain Text post { always { sh "rm -rf ${WORK_DIR}" archiveArtifacts artifacts: "${ARTIFACTS_DIR}/*/**", fingerprint: true } success { echo "Pipeline completed successfully!" } failure { echo "Pipeline failed. Check the logs for details." } } Cleans up temporary files after execution.Archives the generated artifacts (sealed-secrets.yaml and README.txt) for future reference. Key Advantages Dynamic environment setup: Adjusts automatically based on the selected environment.Parallel processing: Reduces runtime by concurrently processing clusters.Multi-cloud compatibility: Handles AKS, GKE, and EKS seamlessly.Secure pperations: Protects sensitive data using Kubernetes Sealed Secrets. This detailed explanation aligns the script with the discussion, showcasing its robust and dynamic capabilities for managing secrets across diverse Kubernetes clusters.