Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Accessing Feeds From EtherDelta on Trades, Funds, Buys, and Sells (Apache NiFi)

DZone's Guide to

Accessing Feeds From EtherDelta on Trades, Funds, Buys, and Sells (Apache NiFi)

Read this tutorial on how to read Ethereum feeds from EtherDelta and how to process with Apache NiFi 1.6+ and make it available to data scientists.

· Integration Zone ·
Free Resource

Continue to drive demand for API management solutions that address the entire API life cycle and bridge the gap to microservices adoption.  

Accessing Feeds From EtherDelta on Trades, Funds, Buys, and Sells (Cryptocurrency Analysis)

EtherDeltalets you trade Ether or Ethereum-based tokens. Ethereum is an open-source blockchain platform for running smart contracts. They provide a fast WebSocket feed of all data coming through the system. We can tap this WebSocket feed with Apache NiFi to examine and ingest all the trades, funds, buys, and sells coming through the system as JSON. Once we ingest, clean up, parse, and schematize the data, we can run queries on it with Apache Spark SQL and Apache Hive in Apache Zeppelin notebooks. Now the data scientists have a continuing stream of data to play with. Next, we will start adding additional feeds from other Ethereum exchanges, BitCoin APIs, and other sources of data. Some of the APIs are REST, some are WebSockets, and some are SDKs. All of these are easy to ingest with Apache NiFi.

Image result for ethereum logo

Initial Ingest From EtherDelta via WebSocket's API

Route To The Correct Type

To Process This Feed With Apache NiFi 1.6+:

  1. ConnectWebSocket - wss://socket.etherdelta.com/socket.io/?transport=websocket
  2. ReplaceText - three to remove extraneous data outside of the JSON (40, 42, 0 - websocket junk text)
  3. RouteOnAttribute - filter out too small files
  4. RouteOnContent - sell, buy, trades and funds
  5. SplitJson buys arrays ($.*.buys) or ($.*.sells)
  6. SplitJson buys and sells into individual JSON Records ($.[*])
  7. UpdateAttribute add a schema name
  8. Send to Remote Cluster via HTTP/HTTPS
  9. On Remote Cluster process and query to limit and convert data to Hive friendly format
  10. Store in HDFS
  11. NiFi generates DDL for an external Hive table
  12. Query with Zeppelin
  13. Handoff to other Zeppelin user who is a Data Scientist for machine learning and statistics.
  14. Profit!

Break Apart Sell Records

Setup Our WebSocket Client

Macintosh Default Java Certificate for SSL (default password is changeit)

Connect to the WebSocket

Remove the extra '40'

Throw away small files!

Route for Buys, Funds, Sells, and Trades

EtherDelta Exchange

EtherDelta provides a WebSocket feed of their data, so I am ingesting that with Apache NiFi and breaking out different types of data being published. This will let us ingest into different Apache Hive tables and run some queries and analytics in Apache Zeppelin on this data. We can then make it available to Data Scientists.

JsonPath Expressions

JsonPath Expression for trades and funds
$.*.* 

Split Json Orders Buys
$.*.buys 

Split Json Orders Sells
$.*.sells


Schemas

ethereumfunds
{ "type" : "record", "name" : "ethereumfunds", "fields" : [ { "name" : "txHash", "type" : "string", "doc" : "Type inferred from '\"0xc40e227f3d5c2e125791cd865e8dd36c4a1a86538f13905ce91d65d2ac721742\"'" }, { "name" : "date", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:52:19.000Z\"'" }, { "name" : "tokenAddr", "type" : "string", "doc" : "Type inferred from '\"0xfb1e5f5e984c28ad7e228cdaa1f8a0919bb6a09b\"'" }, { "name" : "kind", "type" : "string", "doc" : "Type inferred from '\"Deposit\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0xb975cf6c40f9cc5ad5cb7a335f16bdaab6cdcf0d\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"5986.826\"'" }, { "name" : "balance", "type" : "string", "doc" : "Type inferred from '\"5986.826\"'" } ] }
ethereumtrades
{ "type" : "record", "name" : "ethereumtrades", "fields" : [ { "name" : "txHash", "type" : "string", "doc" : "Type inferred from '\"0x0350b7b479c9372c07188d69aa642ced7637b05444735653d61316dd852a673c\"'" }, { "name" : "date", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T18:32:27.000Z\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.000049874\"'" }, { "name" : "side", "type" : "string", "doc" : "Type inferred from '\"buy\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"8800\"'" }, { "name" : "amountBase", "type" : "string", "doc" : "Type inferred from '\"0.4388912\"'" }, { "name" : "buyer", "type" : "string", "doc" : "Type inferred from '\"0xd170db528cd2dd6ca67b0b2e3f7cd6e24942dba2\"'" }, { "name" : "seller", "type" : "string", "doc" : "Type inferred from '\"0xecfd625bfc433e8f6c8ce4abb92b9e8f1db3e401\"'" }, { "name" : "tokenAddr", "type" : "string", "doc" : "Type inferred from '\"0x6888a16ea9792c15a4dcf2f6c623d055c8ede792\"'" } ] }
ethereumbuy
{ "type" : "record", "name" : "ethereumbuy", "fields" : [ { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"9c946737c29c807255c3aac7334e182e375cc3a32684c66ccda03e9f5c52e47e_buy\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.0002743\"'" }, { "name" : "tokenGet", "type" : "string", "doc" : "Type inferred from '\"0x99ea4db9ee77acd40b119bd1dc4e33e1c070b80d\"'" }, { "name" : "amountGet", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "tokenGive", "type" : "string", "doc" : "Type inferred from '\"0x0000000000000000000000000000000000000000\"'" }, { "name" : "amountGive", "type" : "string", "doc" : "Type inferred from '\"1880000083000000000\"'" }, { "name" : "expires", "type" : "string", "doc" : "Type inferred from '\"5590043\"'" }, { "name" : "nonce", "type" : "string", "doc" : "Type inferred from '\"7586490717308181\"'" }, { "name" : "v", "type" : "int", "doc" : "Type inferred from '28'" }, { "name" : "r", "type" : "string", "doc" : "Type inferred from '\"0xd04a8f0a1f86fe8e3bdefb717f1bc461cfdb998f705dff3a5fc5567d023ca116\"'" }, { "name" : "s", "type" : "string", "doc" : "Type inferred from '\"0x7a8b687a8b7faea852ac873c02428acd5c26457282a4105fe12b60776fd87d55\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0x0b419BCE1Cb87ADEa84A913Fa903593fB68D33B1\"'" }, { "name" : "updated", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:41:10.058Z\"'" }, { "name" : "availableVolume", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "ethAvailableVolume", "type" : "string", "doc" : "Type inferred from '\"6853.81\"'" }, { "name" : "availableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"1880000083000000000\"'" }, { "name" : "ethAvailableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"1.880000083\"'" }, { "name" : "amountFilled", "type" : "null", "doc" : "Type inferred from 'null'" } ] }
ethereumsell
{ "type" : "record", "name" : "ethereumsell", "fields" : [ { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"27b06f4b8caf4aaa6d05841f8daa077f5f2131145331489ae94febc5eddd8c56_sell\"'" }, { "name" : "deleted", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"-1.1627879935162941e+21\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.0004473\"'" }, { "name" : "tokenGet", "type" : "string", "doc" : "Type inferred from '\"0x0000000000000000000000000000000000000000\"'" }, { "name" : "amountGet", "type" : "string", "doc" : "Type inferred from '\"520115069499838335\"'" }, { "name" : "tokenGive", "type" : "string", "doc" : "Type inferred from '\"0xe3818504c1B32bF1557b16C238B2E01Fd3149C17\"'" }, { "name" : "amountGive", "type" : "string", "doc" : "Type inferred from '\"1162787993516294126989\"'" }, { "name" : "expires", "type" : "string", "doc" : "Type inferred from '\"5589988\"'" }, { "name" : "nonce", "type" : "string", "doc" : "Type inferred from '\"57125161\"'" }, { "name" : "v", "type" : "int", "doc" : "Type inferred from '28'" }, { "name" : "r", "type" : "string", "doc" : "Type inferred from '\"0xafd5497f6159ac6589fd1804d27fe05436ed13706e64002f0e82e93b471e1780\"'" }, { "name" : "s", "type" : "string", "doc" : "Type inferred from '\"0x76dc38410d35069d1a62c08a1976548fbb915d4c52d8c9789157149720b04a33\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0x7418b4B9327b2DD18AC90Ef2eF846b36F286adA4\"'" }, { "name" : "updated", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:36:55.000Z\"'" }, { "name" : "availableVolume", "type" : "string", "doc" : "Type inferred from '\"1.16278799351629411637773277515287799587448e+21\"'" }, { "name" : "ethAvailableVolume", "type" : "string", "doc" : "Type inferred from '\"1162.7879935162941\"'" }, { "name" : "availableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"520115069499838340\"'" }, { "name" : "ethAvailableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"0.5201150694998383\"'" }, { "name" : "amountFilled", "type" : "null", "doc" : "Type inferred from 'null'" } ] }




Example JSON Data

{"id":"237ab7693be71d35783941da9686f340d32b6d1e7332eedd0636b3e7b3725b93_sell","deleted":true,"amount":"-6000000000000000000","price":"0.02","tokenGet":"0x0000000000000000000000000000000000000000","amountGet":"120000000000000000","tokenGive":"0x219218f117dc9348b358b8471c55a073e5e0da0b","amountGive":"6000000000000000000","expires":"5594927","nonce":"4013451455","v":27,"r":"0x3965d8a9b074c6dcf25ebe10d39833f5ec6aa2d892aaec057dffd4368cd39f46","s":"0x01820eb919fe725c142f560c501742a89d41ca86b5594fdedc94f0e6f91bc97f","user":"0x0d4F98cb588c18FCC2695e2341112f066A915f80","updated":"2018-05-11T13:58:48.064Z","availableVolume":"798131841361720650","ethAvailableVolume":"0.7981318413617207","availableVolumeBase":"15962636827234412","ethAvailableVolumeBase":"0.01596263682723441","amountFilled":null}

{
 "type": "record",
 "name": "ethereumfunds",
 "fields": [
  {
   "name": "txHash",
   "type": [
    "string",
    "null"
   ]
  },
  {
   "name": "date",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"2018-05-10T15:52:19.000Z\"'"
  },
  {
   "name": "tokenAddr",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"0xfb1e5f5e984c28ad7e228cdaa1f8a0919bb6a09b\"'"
  },
  {
   "name": "kind",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"Deposit\"'"
  },
  {
   "name": "user",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"0xb975cf6c40f9cc5ad5cb7a335f16bdaab6cdcf0d\"'"
  },
  {
   "name": "amount",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"5986.826\"'"
  },
  {
   "name": "balance",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"5986.826\"'"
  }
 ]
}





SQL Table DDL

We now have four tables full of different Ethereum Trades, Sells, Buys, and Funds.

Schemas in Hortonworks Schema Registry

Storing The Data

Run SQL on the Flows

Discover how organizations are modernizing their application architectures for speed and agility from the growing API economy

Topics:
ethereum ,blockchain ,cryptocurrency ,apache nifi ,apache hive ,apache spark ,sql ,apache zeppelin ,integration

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}