Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Big Data DevOps With Python

DZone's Guide to

Big Data DevOps With Python

Learn how to run Nipyapi in Apache Zeppelin on your HDP cluster to automate the deployment, versioning, and other operations around Apache NiFi.

· Big Data Zone ·
Free Resource

The Architect’s Guide to Big Data Application Performance. Get the Guide.

A lot of people have asked at the Future of Data meetup how they can automate the deployment, versioning, and other operations around Apache NiFi. Fortunately, there is a project that is current, very easy to use, and now available in version 0.8. The tool that you need is here. Nipyapi does it all. As you can see in the screenshot below, you can even run it in Apache Zeppelin if it's installed on your HDP cluster.

Image title

Installation and Setup

This is very simple. I have installed on HDP and HDF nodes in Centos 7 and on my Mac in both version 2.7 and 3.x. You will need Python and pip to get started.

Most people can quickly install via:

pip install nipyapi

If not, you can clone the repo and then:

pip install -r requirements.txt && pip install -r requirements_dev.txt

To quote Dan Chaffelson, NiPyApi v0.8.0 was released! The Python client for Apache NiFi now supports secured environments, versioning import/export, better templates, better documentation, more demos, and NiFi version backtesting. Release notes can be found here.

In my opinion, it's the best thing to come out of the UK since Dr. Who. If you have been using an older version, there are some big changes now. So buckle up, update, and try out the new demos.

Set up some configuration:

nifi_config.host = 'http://localhost:8080/nifi-api'
registry_config.host = 'http://localhost:18080/nifi-registry-api'
from nipyapi.demo.console import *
print (nipyapi.system.get_system_diagnostics())

You can now automate all the things! Script it all. You will be tempted to call these scripts from Apache NiFi as I am, but perhaps we have gone too far then. This may cause some mental recursion that I am not prepared for. I installed and updated this in Python 2 and Python 3.

Let's update Centos 7:

pip3 install nipyapi -U
Collecting nipyapi
  Downloading nipyapi-0.8.0-py2.py3-none-any.whl (733kB)
    100% |████████████████████████████████| 737kB 1.1MB/s
Collecting docker (from nipyapi)
  Downloading docker-3.1.1-py2.py3-none-any.whl (121kB)
    100% |████████████████████████████████| 122kB 5.9MB/s
Collecting lxml (from nipyapi)
  Downloading lxml-4.1.1-cp36-cp36m-manylinux1_x86_64.whl (5.6MB)
    100% |████████████████████████████████| 5.6MB 178kB/s
Collecting deepdiff (from nipyapi)
  Downloading deepdiff-3.3.0-py3-none-any.whl
Collecting ruamel.yaml==0.14.12 (from nipyapi)
  Downloading ruamel.yaml-0.14.12-cp36-cp36m-manylinux1_x86_64.whl (542kB)
    100% |████████████████████████████████| 552kB 1.5MB/s
Requirement already up-to-date: requests[security] in /usr/lib/python3.6/site-packages (from nipyapi)
Requirement already up-to-date: urllib3 in /usr/lib/python3.6/site-packages (from nipyapi)
Collecting six (from nipyapi)
  Using cached six-1.11.0-py2.py3-none-any.whl
Collecting docker-pycreds>=0.2.2 (from docker->nipyapi)
  Downloading docker_pycreds-0.2.2-py2.py3-none-any.whl
Collecting websocket-client>=0.32.0 (from docker->nipyapi)
  Downloading websocket_client-0.47.0-py2.py3-none-any.whl (200kB)
    100% |████████████████████████████████| 204kB 3.0MB/s
Collecting jsonpickle (from deepdiff->nipyapi)
  Downloading jsonpickle-0.9.6.tar.gz (67kB)
    100% |████████████████████████████████| 71kB 8.3MB/s
Requirement already up-to-date: chardet<3.1.0,>=3.0.2 in /usr/lib/python3.6/site-packages (from requests[security]->nipyapi)
Requirement already up-to-date: certifi>=2017.4.17 in /usr/lib/python3.6/site-packages (from requests[security]->nipyapi)
Requirement already up-to-date: idna<2.7,>=2.5 in /usr/lib/python3.6/site-packages (from requests[security]->nipyapi)
Collecting pyOpenSSL>=0.14; extra == "security" (from requests[security]->nipyapi)
  Downloading pyOpenSSL-17.5.0-py2.py3-none-any.whl (53kB)
    100% |████████████████████████████████| 61kB 5.2MB/s
Collecting cryptography>=1.3.4; extra == "security" (from requests[security]->nipyapi)
  Downloading cryptography-2.1.4-cp36-cp36m-manylinux1_x86_64.whl (2.2MB)
    100% |████████████████████████████████| 2.2MB 409kB/s
Collecting cffi>=1.7; platform_python_implementation != "PyPy" (from cryptography>=1.3.4; extra == "security"->requests[security]->nipyapi)
  Downloading cffi-1.11.5-cp36-cp36m-manylinux1_x86_64.whl (421kB)
    100% |████████████████████████████████| 430kB 2.5MB/s
Collecting asn1crypto>=0.21.0 (from cryptography>=1.3.4; extra == "security"->requests[security]->nipyapi)
  Downloading asn1crypto-0.24.0-py2.py3-none-any.whl (101kB)
    100% |████████████████████████████████| 102kB 7.8MB/s
Collecting pycparser (from cffi>=1.7; platform_python_implementation != "PyPy"->cryptography>=1.3.4; extra == "security"->requests[security]->nipyapi)
  Downloading pycparser-2.18.tar.gz (245kB)
    100% |████████████████████████████████| 256kB 2.5MB/s
Building wheels for collected packages: jsonpickle, pycparser
  Running setup.py bdist_wheel for jsonpickle ... done
  Stored in directory: /root/.cache/pip/wheels/c8/c5/88/0975a9ef0ae87799d3a4ae54244ca8f76eaaf03395f48a5129
  Running setup.py bdist_wheel for pycparser ... done
  Stored in directory: /root/.cache/pip/wheels/95/14/9a/5e7b9024459d2a6600aaa64e0ba485325aff7a9ac7489db1b6
Successfully built jsonpickle pycparser
Installing collected packages: six, docker-pycreds, websocket-client, docker, lxml, jsonpickle, deepdiff, ruamel.yaml, nipyapi, pycparser, cffi, asn1crypto, cryptography, pyOpenSSL
Successfully installed asn1crypto-0.24.0 cffi-1.11.5 cryptography-2.1.4 deepdiff-3.3.0 docker-3.1.1 docker-pycreds-0.2.2 jsonpickle-0.9.6 lxml-4.1.1 nipyapi-0.8.0 pyOpenSSL-17.5.0 pycparser-2.18 ruamel.yaml-0.14.12 six-1.11.0 websocket-client-0.47.0


 pip2 install nipyapi -U
Collecting nipyapi
  Using cached nipyapi-0.8.0-py2.py3-none-any.whl
Collecting lxml (from nipyapi)
  Downloading lxml-4.1.1-cp27-cp27mu-manylinux1_x86_64.whl (5.6MB)
    100% |████████████████████████████████| 5.6MB 155kB/s
Requirement already up-to-date: requests[security] in /usr/lib/python2.7/site-packages (from nipyapi)
Requirement already up-to-date: urllib3 in /usr/lib/python2.7/site-packages (from nipyapi)
Requirement already up-to-date: deepdiff in /usr/lib/python2.7/site-packages/deepdiff-3.3.0-py2.7.egg (from nipyapi)
Collecting ruamel.yaml==0.14.12 (from nipyapi)
  Downloading ruamel.yaml-0.14.12-cp27-cp27mu-manylinux1_x86_64.whl (519kB)
    100% |████████████████████████████████| 522kB 1.3MB/s
Requirement already up-to-date: six in /usr/lib/python2.7/site-packages (from nipyapi)
Requirement already up-to-date: docker in /usr/lib/python2.7/site-packages/docker-3.1.1-py2.7.egg (from nipyapi)
Requirement already up-to-date: certifi>=2017.4.17 in /usr/lib/python2.7/site-packages (from requests[security]->nipyapi)
Requirement already up-to-date: chardet<3.1.0,>=3.0.2 in /usr/lib/python2.7/site-packages (from requests[security]->nipyapi)
Requirement already up-to-date: idna<2.7,>=2.5 in /usr/lib/python2.7/site-packages (from requests[security]->nipyapi)
Collecting cryptography>=1.3.4; extra == "security" (from requests[security]->nipyapi)
  Downloading cryptography-2.1.4-cp27-cp27mu-manylinux1_x86_64.whl (2.2MB)
    100% |████████████████████████████████| 2.2MB 406kB/s
Collecting pyOpenSSL>=0.14; extra == "security" (from requests[security]->nipyapi)
  Using cached pyOpenSSL-17.5.0-py2.py3-none-any.whl
Requirement already up-to-date: jsonpickle in /usr/lib/python2.7/site-packages/jsonpickle-0.9.6-py2.7.egg (from deepdiff->nipyapi)
Requirement already up-to-date: ruamel.ordereddict in /usr/lib/python2.7/site-packages/ruamel.ordereddict-0.4.13-py2.7-linux-x86_64.egg (from ruamel.yaml==0.14.12->nipyapi)
Collecting backports.ssl-match-hostname>=3.5 (from docker->nipyapi)
  Downloading backports.ssl_match_hostname-3.5.0.1.tar.gz
Requirement already up-to-date: docker-pycreds>=0.2.2 in /usr/lib/python2.7/site-packages/docker_pycreds-0.2.2-py2.7.egg (from docker->nipyapi)
Requirement already up-to-date: ipaddress>=1.0.16 in /usr/lib/python2.7/site-packages/ipaddress-1.0.19-py2.7.egg (from docker->nipyapi)
Requirement already up-to-date: websocket-client>=0.32.0 in /usr/lib/python2.7/site-packages/websocket_client-0.47.0-py2.7.egg (from docker->nipyapi)
Collecting cffi>=1.7; platform_python_implementation != "PyPy" (from cryptography>=1.3.4; extra == "security"->requests[security]->nipyapi)
  Downloading cffi-1.11.5-cp27-cp27mu-manylinux1_x86_64.whl (407kB)
    100% |████████████████████████████████| 409kB 1.7MB/s
Collecting enum34; python_version < "3" (from cryptography>=1.3.4; extra == "security"->requests[security]->nipyapi)
  Downloading enum34-1.1.6-py2-none-any.whl
Collecting asn1crypto>=0.21.0 (from cryptography>=1.3.4; extra == "security"->requests[security]->nipyapi)
  Using cached asn1crypto-0.24.0-py2.py3-none-any.whl
Collecting pycparser (from cffi>=1.7; platform_python_implementation != "PyPy"->cryptography>=1.3.4; extra == "security"->requests[security]->nipyapi)
Building wheels for collected packages: backports.ssl-match-hostname
  Running setup.py bdist_wheel for backports.ssl-match-hostname ... done
  Stored in directory: /root/.cache/pip/wheels/5d/72/36/b2a31507b613967b728edc33378a5ff2ada0f62855b93c5ae1
Successfully built backports.ssl-match-hostname
Installing collected packages: lxml, ruamel.yaml, nipyapi, pycparser, cffi, enum34, asn1crypto, cryptography, pyOpenSSL, backports.ssl-match-hostname
  Found existing installation: lxml 3.2.1
    Uninstalling lxml-3.2.1:
      Successfully uninstalled lxml-3.2.1
  Found existing installation: ruamel.yaml 0.15.35
    Uninstalling ruamel.yaml-0.15.35:
      Successfully uninstalled ruamel.yaml-0.15.35
  Found existing installation: nipyapi 0.7.0
    Uninstalling nipyapi-0.7.0:
      Successfully uninstalled nipyapi-0.7.0
  Found existing installation: backports.ssl-match-hostname 3.4.0.2
    Uninstalling backports.ssl-match-hostname-3.4.0.2:
      Successfully uninstalled backports.ssl-match-hostname-3.4.0.2
Successfully installed asn1crypto-0.24.0 backports.ssl-match-hostname-3.5.0.1 cffi-1.11.5 cryptography-2.1.4 enum34-1.1.6 lxml-4.1.1 nipyapi-0.8.0 pyOpenSSL-17.5.0 pycparser-2.18 ruamel.yaml-0.14.12

I ran this on an HDF server on Centos 7 and got a pretty nice example demo run.

python3.6 conso.py
0.8.0
{'system_diagnostics': {'aggregate_snapshot': {'available_processors': 32,
                                               'content_repository_storage_usage': [{'free_space': '226.77 '
                                                                                                   'GB',
                                                                                     'free_space_bytes': 243491807232,
                                                                                     'identifier': 'default',
                                                                                     'total_space': '249.99 '
                                                                                                    'GB',
                                                                                     'total_space_bytes': 268420476928,
                                                                                     'used_space': '23.22 '
                                                                                                   'GB',
                                                                                     'used_space_bytes': 24928669696,
                                                                                     'utilization': '9.0%'}],
                                               'daemon_threads': 141,
                                               'flow_file_repository_storage_usage': {'free_space': '226.77 '
                                                                                                    'GB',
                                                                                      'free_space_bytes': 243491807232,
                                                                                      'identifier': None,
                                                                                      'total_space': '249.99 '
                                                                                                     'GB',
                                                                                      'total_space_bytes': 268420476928,
                                                                                      'used_space': '23.22 '
                                                                                                    'GB',
                                                                                      'used_space_bytes': 24928669696,
                                                                                      'utilization': '9.0%'},
                                               'free_heap': '218.43 MB',
                                               'free_heap_bytes': 229037208,
                                               'free_non_heap': '28.15 MB',
                                               'free_non_heap_bytes': 29513496,
                                               'garbage_collection': [{'collection_count': 0,
                                                                       'collection_millis': 0,
                                                                       'collection_time': '00:00:00.000',
                                                                       'name': 'G1 '
                                                                               'Old '
                                                                               'Generation'},
                                                                      {'collection_count': 5295,
                                                                       'collection_millis': 362298,
                                                                       'collection_time': '00:06:02.298',
                                                                       'name': 'G1 '
                                                                               'Young '
                                                                               'Generation'}],
                                               'heap_utilization': '57.0%',
                                               'max_heap': '512 MB',
                                               'max_heap_bytes': 536870912,
                                               'max_non_heap': '-1 bytes',
                                               'max_non_heap_bytes': -1,
                                               'non_heap_utilization': None,
                                               'processor_load_average': 0.54,
                                               'provenance_repository_storage_usage': [{'free_space': '226.77 '
                                                                                                      'GB',
                                                                                        'free_space_bytes': 243491807232,
                                                                                        'identifier': 'default',
                                                                                        'total_space': '249.99 '
                                                                                                       'GB',
                                                                                        'total_space_bytes': 268420476928,
                                                                                        'used_space': '23.22 '
                                                                                                      'GB',
                                                                                        'used_space_bytes': 24928669696,
                                                                                        'utilization': '9.0%'}],
                                               'stats_last_refreshed': '16:08:54 '
                                                                       'UTC',
                                               'total_heap': '512 MB',
                                               'total_heap_bytes': 536870912,
                                               'total_non_heap': '494.65 MB',
                                               'total_non_heap_bytes': 518676480,
                                               'total_threads': 201,
                                               'uptime': '87:41:10.346',
                                               'used_heap': '293.57 MB',
                                               'used_heap_bytes': 307833704,
                                               'used_non_heap': '466.5 MB',
                                               'used_non_heap_bytes': 489162984,
                                               'version_info': {'build_branch': 'UNKNOWN',
                                                                'build_revision': 'd5f3eef',
                                                                'build_tag': 'nifi-1.5.0-RC1',
                                                                'build_timestamp': '01/30/2018 '
                                                                                   '23:17:15 '
                                                                                   'UTC',
                                                                'java_vendor': 'Oracle '
                                                                               'Corporation',
                                                                'java_version': '1.8.0_112',
                                                                'ni_fi_version': '1.5.0.3.1.0.0-564',
                                                                'os_architecture': 'amd64',
                                                                'os_name': 'Linux',
                                                                'os_version': '3.10.0-693.17.1.el7.x86_64'}},
                        'node_snapshots': None}}
INFO:nipyapi.demo.console:Setting up NiPyApi Demo Console
INFO:nipyapi.demo.console:Cleaning up old NiPyApi Console Process Groups
INFO:nipyapi.demo.console:Creating process_group_0 as an empty process group name nipyapi_console_process_group_0
INFO:nipyapi.demo.console:Cleaning up old NiPyApi Console Processors
INFO:nipyapi.demo.console:Creating processor_0 as a new GenerateFlowFile named nipyapi_console_processor_0 in the previous ProcessGroup
INFO:nipyapi.demo.console:Creating reg_client_0 as NiFi Registry Client
INFO:nipyapi.demo.console:Cleaning up old NiPyApi Console Registry Buckets
INFO:nipyapi.demo.console:Creating bucket_0 as new a Registry Bucket named nipyapi_console_bucket_0
INFO:nipyapi.demo.console:Creating bucket_1 as new a Registry Bucket named nipyapi_console_bucket_1
INFO:nipyapi.demo.console:Saving nipyapi_console_process_group_0 as a new Versioned Flow named nipyapi_console_ver_flow_0 in Bucket nipyapi_console_bucket_0, and saving as variable ver_flow_info_0
INFO:nipyapi.demo.console:Creating ver_flow_0 as a copy of the new Versioned Flow object
INFO:nipyapi.demo.console:Creating ver_flow_snapshot_0 as a copy of the new Versioned FlowSnapshot
INFO:nipyapi.demo.console:Creating ver_flow_1 as an empty Versioned Flow stub named nipyapi_console_ver_flow_1 in nipyapi_console_bucket_1
INFO:nipyapi.demo.console:Creating ver_flow_snapshot_1 by cloning the first snapshot nipyapi_console_ver_flow_0 into the new Versioned Flow Stub nipyapi_console_ver_flow_1
INFO:nipyapi.demo.console:Creating ver_flow_raw_0 as a raw Json export of nipyapi_console_ver_flow_0
INFO:nipyapi.demo.console:Creating ver_flow_json_0 as a sorted pretty Json export of nipyapi_console_ver_flow_0
INFO:nipyapi.demo.console:Creating ver_flow_yaml_0 as a sorted pretty Yaml export of nipyapi_console_ver_flow_0
INFO:nipyapi.demo.console:Creating flow_template_0 as a new Template from Process Group nipyapi_console_process_group_0
Demo Console deployed!

Let's do a few activities that might be handy in a typical enterprise Apache NiFi 1.5 (HDF 3.1) environment.

DevOps activities: see here.

List all templates:

import nipyapi 
nipyapi.config.nifi_config.host = 'http://server:9090/nifi-api' 
nipyapi.config.registry_config.host = 'http://server:61080/nifi-registry-api' 
print(nipyapi.templates.list_all_templates())

Example output:

               {'bulletins': None,
                'id': '21e95277-98ae-38ce-9235-bbf90772cef3',
                'permissions': {'can_read': True, 'can_write': True},
                'position': None,
                'revision': None,
                'template': {'description': '',
                             'encoding_version': None,
                             'group_id': '7c84501d-d10c-407c-b9f3-1d80e38fe36a',
                             'id': '21e95277-98ae-38ce-9235-bbf90772cef3',
                             'name': 'ApacheMxNet Local Processing',
                             'snippet': None,
                             'timestamp': '03/07/2018 17:48:06 UTC',
                             'uri': 'http://princeton1.field.hortonworks.com:9090/nifi-api/templates/21e95277-98ae-38ce-9235-bbf90772cef3'},
                'uri': None}]}

Upload a template:

nipyapi.templates.upload_template (pg_id, template_file)

Create a template from a process group:

nipyapi.templates.create_template (pg_id, name, desc='')

Delete a process group:

nipyapi.canvas.delete_process_group (process_group, force=False, refresh=True)

List all process groups:

nipyapi.canvas.list_all_process_groups()

Create a processor in a process group:

nipyapi.canvas.create_processor (parent_pg, processor, location, name=None, config=None)

Delete a processor from a process group:

nipyapi.canvas.delete_processor (processor, refresh=True, force=False)

Start or stop a processor (if possible):

nipyapi.canvas.schedule_processor (processor, scheduled, refresh=True)

Update a key-value pair in any variable registry:

nipyapi.canvas.update_variable_registry (process_group, update)

You can create a registry client on the NiFi server:

nipyapi.versioning.create_registry_client (name, uri, description)

Check existing registry clients (more common use case, admin already connected one or more):

print(nipyapi.versioning.list_registry_clients())

Example output:

{'registries': [{'bulletins': None,
                 'component': {'description': 'NiPyApi Demo Console',
                               'id': 'adc0435b-3eb5-1e1a-ffff-fffff521cba3',
                               'name': 'nipyapi_console_reg_client_0',
                               'uri': 'http://princeton1.field.hortonworks.com:61080/'},
                 'id': 'adc0435b-3eb5-1e1a-ffff-fffff521cba3',
                 'permissions': {'can_read': True, 'can_write': True},
                 'position': None,
                 'revision': {'client_id': 'adc0439f-3eb5-1e1a-6ca0-baadd98fcdf6',
                              'last_modifier': None,
                              'version': 2},
                 'uri': 'http://princeton1.field.hortonworks.com:9090/nifi-api/controller/registry-clients/adc0435b-3eb5-1e1a-ffff-fffff521cba3'}]}

Get version registry buckets:

print(nipyapi.versioning.list_registry_buckets())

Create a new version registry bucket:

ipyapi.versioning.create_registry_bucket (name)

Save a flow version to registry:

rgcid = 'nipyapi_console_reg_client_0'
registry_client = nipyapi.versioning.get_registry_client(rgcid, identifier_type='name')
identifier = 'nipyapi_console_process_group_0'
process_group = nipyapi.canvas.get_process_group(identifier, identifier_type='name')
bucketid = 'meetup'
bucket = nipyapi.versioning.get_registry_bucket(bucketid, identifier_type='name')
print(nipyapi.versioning.save_flow_ver(process_group, registry_client, bucket, flow_name='built by python', flow_id=None, comment='automated', desc='automated', refresh=True))

You grab a process group, pick up a bucket, and save your version there.

Example output:

{'process_group_revision': {'client_id': 'adc045ac-3eb5-1e1a-9861-68c932e967e9',
                            'last_modifier': None,
                            'version': 3},
 'version_control_information': {'bucket_id': 'd80946af-64be-471a-baf8-d85fa98d0a19',
                                 'bucket_name': 'meetup',
                                 'flow_description': 'automated',
                                 'flow_id': '64f37030-6c0f-4272-b16c-cb8a0119e776',
                                 'flow_name': 'built by python',
                                 'group_id': 'e2794288-daa1-173a-ffff-ffffdadbf3f8',
                                 'registry_id': 'adc0435b-3eb5-1e1a-ffff-fffff521cba3',
                                 'registry_name': 'nipyapi_console_reg_client_0',
                                 'state': 'UP_TO_DATE',
                                 'state_explanation': 'Flow version is current',
                                 'version': 1}}

Export a registry flow version:

nipyapi.versioning.export_flow_version(bucket_id, flow_id, version=None, file_path=None, mode='json')

Get the variable registry from a process group:

import nipyapi
nipyapi.config.nifi_config.host = 'http://server:9090/nifi-api' 
nipyapi.config.registry_config.host = 'http://server:61080/nifi-registry-api' 
identifier = 'nipyapi_console_process_group_0' 
process_group = nipyapi.canvas.get_process_group(identifier, identifier_type='name') 
print(nipyapi.canvas.get_variable_registry(process_group, ancestors=True))

Example output:

{'process_group_revision': {'client_id': 'adc045ac-3eb5-1e1a-9861-68c932e967e9',
                            'last_modifier': None,
                            'version': 2},
 'variable_registry': {'process_group_id': 'e2794288-daa1-173a-ffff-ffffdadbf3f8',
                       'variables': [{'can_write': True,
                                      'variable': {'affected_components': [],
                                                   'name': 'testsqlconnection',
                                                   'process_group_id': 'e2794288-daa1-173a-ffff-ffffdadbf3f8',
                                                   'value': 'jdbc=fun:8080'}}]}}

Export a template as a string:

t_id='21e95277-98ae-38ce-9235-bbf90772cef3' 
print(nipyapi.templates.export_template(t_id, output='string'))

You can also export as a file. As you can imagine, if you combined this with some Python manipulation magic and some deep learning, you could take a template, back it up, change it, add to it, upload it, deploy it, and version it. You could do things like upload templates, move them to/from the Apache NiFi Registry, and perform other useful ops.

Get Apache NiFi version info:

print(nipyapi.system.get_nifi_version_info())

Example output:

{'build_branch': 'UNKNOWN',
 'build_revision': 'd5f3eef',
 'build_tag': 'nifi-1.5.0-RC1',
 'build_timestamp': u'01/30/2018 23:17:15 UTC',
 'java_vendor': 'Oracle Corporation',
 'java_version': '1.8.0_112',
 'ni_fi_version': '1.5.0.3.1.0.0-564',
 'os_architecture': 'amd64',
 'os_name': 'Linux',
 'os_version': '3.10.0-693.17.1.el7.x86_64'}

Get cluster info (experimental):

print(nipyapi.system.get_cluster())

Example output:

{'cluster': {'generated': '17:57:41 UTC',
             'nodes': [{'active_thread_count': 5,
                        'address': 'princeton1.field.hortonworks.com',
                        'api_port': 9090,
                        'connection_requested': None,
                        'events': [{'category': 'INFO',
                                    'message': 'Received first heartbeat from connecting node. Node connected.',
                                    'timestamp': '03/06/2018 00:33:58 UTC'},
                                   {'category': 'INFO',
                                    'message': 'Received heartbeat but ignoring because it was reported before the node was last asked to reconnect.',
                                    'timestamp': '03/06/2018 00:33:53 UTC'},
                                   {'category': 'INFO',
                                    'message': 'Connection requested from existing node. Setting status to connecting.',
                                    'timestamp': '03/06/2018 00:33:48 UTC'},
                                   {'category': 'INFO',
                                    'message': 'Requesting that node connect to cluster',
                                    'timestamp': '03/06/2018 00:33:48 UTC'},
                                   {'category': 'INFO',
                                    'message': 'Received heartbeat from node previously disconnected due to Has Not Yet Connected to Cluster. Issuing reconnection request.',
                                    'timestamp': '03/06/2018 00:33:48 UTC'}],
                        'heartbeat': '03/09/2018 17:57:38 UTC',
                        'node_id': '1b6eb3aa-add8-476d-a91a-5b500f6515f1',
                        'node_start_time': '03/06/2018 00:28:21 UTC',
                        'queued': '3,865 / 12.9 MB',
                        'roles': ['Primary Node', 'Cluster Coordinator'],
                        'status': 'CONNECTED'}]}}

For the really brave... purge it!

nipyapi.canvas.purge_connection (con_id)

nipyapi.canvas.purge_process_group (process_group, stop=False)

nipyapi.canvas.delete_process_group (process_group, force=True, refresh=True)

There is a large security section that you should check out for your secure environment.

One of the cool things that almost goes without saying is that all data is returned as JSON, which is, of course, easy to work with in Python as well as in Apache NiFi and most other tools.

Learn how taking a DataOps approach will help you speed up processes and increase data quality by providing streamlined analytics pipelines via automation and testing. Learn More.

Topics:
devops ,big data ,apache nifi ,python ,tutorial ,nipyapi

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}