DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

The Latest Data Engineering Topics

article thumbnail
Sequelize, the JavaScript ORM, in practice
node.js is well-know for its good connectivity with nosql databases. a less know fact is that it's also very efficient with relational databases. among the dozens orms out there in javascript, one stands out for relational databases: sequelize . it's quite easy to learn but there are not many pointers about how to organize model code with this module. here are a few tips we learned by using sequelize in a medium size project. sequelize 101 sequelize claims to supports mysql, postgresql and sqlite. the sequelize docs explain the first steps with the javascript orm. first, initialize a database connection, then a few models, without worrying about primary or foreign keys: var sequelize = new sequelize('database', 'username'[, 'password']) var project = sequelize.define('project', { title: sequelize.string, description: sequelize.text }); var task = sequelize.define('task', { title: sequelize.string, description: sequelize.text, deadline: sequelize.date }); project.hasmany(task); next, create new instances and persist them, query the database, etc. // create an instance var task = task.build({title: 'very important task'}) task.title // ==> 'very important task' // persist an instance task.save() .error(function(err) { // error callback }) .success(function() { // success callback }); // query persistence for instances var tasks = task.all({ where: ['dealdine < ?', new date()] }) .error(function(err) { // error callback }) .success(function() { // success callback }); sequelize uses promises so you can chain error and success callbacks, and it all plays well with unit tests. all that is pretty well documented, but the sequelize documentation only covers the basic usage. once you start using sequelize in real world projects, finding the right way to implement a feature gets trickier. model file structure all the examples in the sequelize documentation show all model declarations grouped in a single file. once a project reaches production size, this is not a viable approach. the alternative is to import models from a module using sequelize.import() . the problem is that relationships rely on several models. when you declare a relationship, models from both sides of the relationship must already be imported. you should not import model files from other model files because of node.js module caching policy (more on that later on); instead, you can define relationships in a standalone file. here is the file structure we've been working with: models/ index.js # import all models and creates relationships phonenumber.js task.js user.js ... and here is how the main models/index.js initializes the entire model: var sequelize = require('sequelize'); var config = require('config').database; // we use node-config to handle environments // initialize database connection var sequelize = new sequelize( config.name, config.username, config.password, config.options ); // load models var models = [ 'phonenumber', 'task', 'user' ]; models.foreach(function(model) { module.exports[model] = sequelize.import(__dirname + '/' + model); }); // describe relationships (function(m) { m.phonenumber.belongsto(m.user); m.task.belongsto(m.user); m.user.hasmany(m.task); m.user.hasmany(m.phonenumber); })(module.exports); // export connection module.exports.sequelize = sequelize; using models in code from other parts of the application, if you need a model class, require the models/index.js instead of the standalone model file. that way, you don't have to repeat the sequelize initialization. var models = require('./models'); var user = models.user; var user = user.build({ first_name: "john", last_name: "doe "}); the problem is, when you require the models/index.js file, node may use a cached version of the module... or not: from nodejs.org : multiple calls to require('foo') may not cause the module code to be executed multiple times. (...) modules are cached based on their resolved filename. since modules may resolve to a different filename based on the location of the calling module (loading from node_modules folders), it is not a guarantee that require('foo') will always return the exact same object, if it would resolve to different files. that means that using require('./models') to get the models may create more than one connection to the database. to avoid that, the models variable must be somehow singleton-esque. this can be easily achieved, if you're using a framework like expressjs , by attaching the models module to the application: app.set('models', require('./models')); and when you need to require a class of the model in a controller, use this application setting rather than a direct import: var user = app.get('models').user; accessing other models sequelize models can be extended with class and instance methods. you can add abilities to model classes, much like in a true activerecord implementation. but a problem arises when adding a method that depends on another model: how can a model access another one? // in models/user.js module.exports = function(sequelize, datatypes) { return sequelize.define('user', { first_name: datatypes.string, last_name: datatypes.string, }, { instancemethods: { counttasks: function() { // how to implement this method ? } } }); }; if the two models share a relationship, there is a way. here, one user has many tasks , that makes the task model accessible through user.associations['tasks'].target . and here is yet another problem: since sequelize doesn't use prototype-based inheritance, how can a user instance gain access to the user class? digging into the sequelize source brings the protected __factory to the light. with all this undocumented knowledge, it is now possible to write the counttasks() instance method: counttasks: function() { return this.__factory.associations['tasks'].target.count({ where: { user_id: this.id } }); } note that task.count() returns a promise, so counttasks() also returns a promise: user.counttasks().success(function(nbtasks) { // do somethig with the user task count }); extending models (a.k.a behaviors) what if you need to reuse several methods across several models? sequelize doesn't have a behavior system per se (or "concerns" in the ruby on rails terminology), although it's quite easy to implement . for now, you're condemned to import common methods before the call to sequelize.define() and use sequelize.utils._.extend() to add it to the instancemethods or classmethods object: // in models/friendlyurl.js module.exports = function(keys) { return { geturl: function() { var ret = ''; keys.foreach(function(key) { ret += this[key]; }) return ret .tolowercase() .replace(/^\s+|\s+$/g, "") // trim whitespace .replace(/[_|\s]+/g, "-") .replace(/[^a-z0-9-]+/g, "") .replace(/[-]+/g, "-") .replace(/^-+|-+$/g, ""); } }; } // in models/user.js var friendlyurlmethods = require('./friendlyurl')(['first_name', 'last_name']); module.exports = function(sequelize, datatypes) { return sequelize.define('user', { first_name: datatypes.string, last_name: datatypes.string, }, { instancemethods: sequelize.utils._.extend({}, friendlyurlmethods, { counttasks: function() { return this.__factory.associations['tasks'].target.count({ where: { user_id: this.id } }); } }); }) }; now the user model instances gain access to a geturl() method: var user = user.build({ first_name: 'john', last_name: 'doe' }); user.geturl(); // 'john_doe' a limitation of this trick is that you must define behaviors before the actual model. this forbids behaviors accessing other models. query series sequelize provides a tool called the querychainer to ease the resynchronization of queries. new sequelize.utils.querychainer() .add(user, 'find', [id]) .add(task, 'findall') .error(function(err) { /* hmm not good :> */ }) .success(function(results) { var user = results[0]; var tasks = results[1]; // do things with the results }); after using it a little, this utility turns out to be very limited. most notably, querychainer executes all queries in parallel by default. and you only get access to the results of the queries in the final callback - no way to pass values from one query to the other. we've found it much more convenient to use a generic resynchronizing module like async , which provides the wonderful async.auto() utility. this method lets you list tasks together with dependencies, and determines which task can be run in parallel, and which must be run in series. async.auto([ user: function(next) { user.find(id).complete(next); }, tasks: ['user', function(next) { tasks.findall({ where: { user_id: user.id } }).complete(next); }] ], function(err, results) { var user = results.user; var tasks = results.tasks; // do things with the results }); notice the complete() method, which is an alternative to the two success() and error() callbacks. complete() accepts a callback with the signature (err, res) , which is more natural in the node.js world, and compatible with async . prefetching one thing orms are usually good at is minimizing queries. sequelize offers a prefetching feature, allowing to group two queries in a single one using a join. for instance, if you want to retrieve a task together with the related user, write the query as follows: task.find({ where: { id: id } }, include: ['user']) .error(function(err) { // error callback }) .success(function(task) { task.getuser(); // does not trigger a new query }); this is another undocumented feature, although the documentation should be updated soon . migrations sequelize provides a migration command line utility. but because it only allows modifying the model using sequelize commands (and not calling any asynchronous method of your own ), this migration command falls short. as of now, we've been handling migrations manually using numbered sql files and a custom utility to run them in order. custom sql queries sequelize is built over database adapters, and as such provides a way to execute arbitrary sql queries against the database. here is an example: var util = require('util'); var query = 'select * from `task` ' + 'left join `user` on `task`.`userid` = `user`.`id` ' + 'where `user`.`last_name` = %s'; var escapedname = sequelize.constructor.utils.escape(last_name); querywithparams = util.format(query, escapedname); sequelize.query(querywithparams, task) .error(function(err) { // error callback }) .success(function(tasks) { task.getuser(); // does not trigger a new query }); sequelize.query() returns a promise just like other query functions. if you provide the model to use for hydration ( task in this case), the query() method returns model instances rather than a simple json. note that you must escape values by hand before concatenating them into the sql query. for strings, sequelize.constructor.utils.escape() is your friend. for integers, util.format('%d') should do the trick. conclusion is sequelize ready for prime time ? almost. the learning curve is made longer by an incomplete documentation, but most of the features required by a production-level orm are there. however, i wouldn't recommend it for production just yet if you're not ready to run on your own fork, since the rate at which prs are merged in the sequelize github repository is low.
March 5, 2013
by Francois Zaninotto
· 52,874 Views · 2 Likes
article thumbnail
SAP Integration with Talend Components / Connectors (BAPI, RFC, IDoc, BW, SOAP)
talend has several connectors to integrate sap systems. however, this guide is no introduction to talend’s sap components. instead, this guide helps to understand different alternatives to integrate sap systems with talend set up a local sap system configure talend studio for using sap components use talend’s sap wizard run a first talend job which connects to sap all further required information and example use cases for talend’s sap components should be available in the talend component guide at www.help.talend.com . if that’s not the case, please create a jira documentation ticket ( https://jira.talendforge.org/browse/doct )! now let’s take a look at different alternatives for integration of sap systems with talend. alternatives for sap integration three protocols exist for communication between sap and external programs: dynamic information and action gateway (diag): e.g. used by sap gui remote function call (rfc): a function call with input and output parameters (like a java interface) hypertext transfer protocol (http): internet standard the following alternatives are available for integrating sap systems using some of these protocols. file sap supports the direct import of files (call-transaction-program, batch-input, direct input). files have to be in a specific format to be imported. transformation and integration can be realized with talend’s various file components such as tfileinputdelimited. rfc remote function call is the proprietary sap ag interface for communication between a sap system and other sap or third-party compatible system over tcp/ip or cpi-c connections. remote function calls may be associated with sap software and abap programming, and provide a way for an external program (written in languages such as php, asp, java, or c, c++) to use data returned from the server. data transactions are not limited to getting data from the server, but can insert data into server records as well. sap can act as the client or server in an rfc call. a remote function call (rfc) is the call or remote execution of a remote function module in an external system. in the sap system, these functions are provided by the rfc interface system. the rfc interface system enables function calls between two sap systems, or between a sap system and an external system. tsapinput and tsapoutput are talend’s components to use rfcs. business application programming interface (bapi) a bapi is an object-oriented view on most data and transactions of a sap system (called “business objects”). object types of the business objects are stored in the business object repository (bor). bapis are always implemented as rfcs and therefore can be called the same way. additionally, they have the following characteristics (compared to rfcs): stable interface no view layer no exceptions, instead export parameter: “return” most business objects offer the following standard bapis: getlist getdetail change creationfromdata tsapinput and tsapoutput are talend’s components to use bapis. application link enabling (ale) application link enabling (ale) is used for asynchronous messaging between different systems via “intermediate documents (idoc)”. idoc is a sap document format for business transaction data transfers. it is used to realize distributed business processes. idoc is similar to xml in purpose, but differs in syntax. both serve the purpose of data exchange and automation in computer systems, but the idoc technology takes a different approach. while xml allows having some metadata about the document itself, an idoc is obligated to have information at its header like its creator, creation time, etc. while xml has a tag-like tree structure containing data and meta-data, idocs use a table with the data and meta-data. idocs also have a session that explains all the processes which the document passed or will pass, allowing one to debug and trace the status of the document. an idoc consists of control record (it contains the type of idoc, port of the partner, release of sap r/3 which produced the idoc, etc.) data records of different types. the number and type of segments is mostly fixed for each idoc type, but there is some flexibility (for example an sd order can have any number of items). status records containing messages such as 'idoc created', 'the recipient exists', 'idoc was successfully passed to the port', 'could not book the invoice because...' different idoc types are available to handle different types of messages. for example, the idoc format orders01 may be used for both purchase orders and order confirmations. tsapidocinput and tsapidocoutput are talend’s components to use ale / idoc. bapis can also be called asynchronously via ale. all new idocs are even based on bapis. soap web services sap supports soap web services. not just sap as java, but also sap as abap! integration can be realized with talend’s esb / web service components such as tesbrequest, tesbresponse, or tesbconsumer. installation of sap server and client installation can take about 6 to 8 hours, but it is an “all in one installation”, i.e. you can install it overnight. steps for installation: get yourself a windows 7 64 bit laptop or vm with 8+ gb ram and 50+gb free disc space get a sap community account (for free, just register): http://scn.sap.com/welcome download sap netweaver (software downloads --> sap netweaver main releases: http://www.sdn.sap.com/irj/scn/nw-downloads download current version of sap netweaver application server abap 64-bit trial install sap server: follow installation guide – a html website included in the download in root of extracted download folder (start.htm --> there click on “installation” link) install sap gui (rich client frontend): start.htm --> there click on “install sap gui” link and follow instructions download the sap jco for the operating system on which your connector is running. the sap jco is available for download from sap's website at http://service.sap.com/connectors . you must have an sapnet account to access the sap jco (if you do not already have one, contact your local sap basis administrator). usage of sap server hint: you have to use a windows user which has a password (as you need to enter windows credentials when stopping sap). if you have a windows user without a password (for instance if you use windows within a vm on your mac), sap cannot process these credentials (i.e. it cannot process an empty password field) --> change your windows password before starting sap start the management console (windows startmenu --> programs --> sap management console) start and stop the sap server (right click on “nsp” --> start / stop) default user: sap* (sap system super user) password: the one which you entered at installation of sap netweaver, e.g. admin123 usage of sap client a sap client should be used to get information about the sap system (functions, data, etc.) similarly to using e.g. mysql workbench to get information from a mysql database. sap gui (view layer) communicates with sap as abap (business logic layer). the application server communicates with the relational database (db layer). different clients are available for sap: sap gui windows sap gui java web browser external rfc-program for local development demos, sap gui windows is probably the best alternative. start sap gui windows by: clicking shortcut “windows start menu --> sap frontend --> sap logon” entering username and password clicking logon sap transactions in sap, you call sap programs via sap transaction codes. important transactions codes are for example: bapi: bapi explorer, view all sap bapi's se16: data browser, view/add table data se38: program editor here is a list of several other important transaction codes: http://www.sapdev.co.uk/tcodes/tcodes.htm installation of demo data the sap installation includes some demo data. as most people do not want to install “real” sap modules such as sap fi, sap crm or sap bi on their local system, this demo data is perfect for demos using talend’s sap connectors. to install the flight demo on a local sap system, you just have to open the abap editor (transaction: se38) and execute the program sapbc_data_generator. this program generates example data within the flight tables and does some further initializations. here is a good tutorial with more information and how to test the flight application: http://help.sap.com/saphelp_erp60_sp/helpdata/de/db/7c623cf568896be10000000a11405a/content.htm configuration of talend studio to use sap components talend’s sap components are already included in the studio. however, two further steps are required to be able to use them: copy sapjco3.dll to the directory c:/windows/system32 sap java connector jar must be added copy sapjco3.jar to the directory “talend/studio/lib/java” (re-) start talend studio check if sap library is added successfully open view “talend modules” (eclipse --> windows --> show view --> talend --> modules) sort by column “context” look for “tsap*” contexts and check if sapjco3.jar has status “installed” usage of sap components with talend studio this section describes how to use talend’s sap components and the sap wizard in general (using one specific example for calling a bapi). detailed descriptions of all sap components (for using bapis, rfcs, idocs, bw, etc.) are available in the documentation talend_components_rg_x.y.z.pdf at www.help.talend.com . connection to a sap system a connection to a sap system can be done “built-in” or via “metadata --> sap connections” (the latter only in enterprise version). using the latter has several advantages: reuse connection configuration quick check if connection to sap works wizards for retrieving functions from sap (instead of handwriting without wizard) quick test with test parameters if function works before finishing development lifecycle for a sap job development lifecycle for sap job: create connection (if not existing yet) right click on metadata --> sap connection create sap connection follow wizard sap jco version: 3 client: “001” userid: “sap*” password: “admin123” --> as you defined it while installation language: “en” hostname: “localhost” system number: “00” retrieve function (bapi / rfc) right click on created connection click on “retrieve sap function” enter search filter (e.g. bapi_fl*) click on “search” select and double click on your function (e.g bapi_flcust_getlist) you see all input, output and table parameters for this sap function click on “test in” --> here you see parameters in more detail: you now have to define which input and output parameters you want to use --> remove all other by selecting them and clicking “remove” button hint: if you do not remove an input parameter, you usually have to enter a value for it! select the output type - can be a single (single record), a table (list of records), or a structure output hint: difference between table and structure in sap: http://www.sapfans.com/forums/viewtopic.php?f=12&t=119794 if you want to do a quick test: enter values for input parameters (if there are any for your function call), then click “launch” button in this example, there is only an optional input parameter max_rows you should see data in the output fields in this example, you see the record with custname “sap ag” and street “neurottstr. 16” click “finish” button under “metadata --> sap connections --> “your connection” --> sap functions: there you can now see your function (in this example: bapi_flcust_getlist) create sap job drag&drop the created function into a job (without the wizard, you also can enter all data by hand) tsapinput component is proposed automatically. click ok to add it to your job go to “initialize input” and add parameter values in this example, there is just the parameter “max_rows” hint: the parameter value can be changed from a hardcoded value to a variable, of course (just click control space on your keyboard to get access to all available variables via code completion in your studio) go to the tsapinput component and add the desired output mapping (i.e. which values you want to process further with other components scroll to the bottom to “outputs” add the correct table / structure name (in this example: "customer_list") click on mapping (which is empty and has to be filled) click on “mapping”, then click on “…” add the wanted output columns of your sap function add the same names at the column “schema xpathqueries” (do not forget the double quotes here!) click “ok” button connect the tsapinput component to a tlogrowcomponent and synchronize the schema hint: always try out if this works before adding further logic to your job! run and test your job (you will see five rows logged (as you have configured max_rows = 5 that's it. now enjoy talend's sap components :-) best regards, kai wähner (twitter: @kaiwaehner) content from my blog: http://www.kai-waehner.de/blog/2013/03/03/sap-integration-with-talend-components-connectors-bapi-rfc-idoc-bw-soap/
March 4, 2013
by Kai Wähner DZone Core CORE
· 32,891 Views · 1 Like
article thumbnail
JUnit testing of Spring MVC application: Testing DAO layer
In continuation of my blog JUnit testing of Spring MVC application – Introduction, in this blog, I will show how to design and implement DAO layer for the Bookstore Spring MVC web application using Test Driven development. For people in hurry, get the latest code from Github and run the below command mvn clean test -Dtest=com.example.bookstore.repository.JpaBookRepositoryTest As a part of TDD, Write a basic CRUD (create, read, update, delete) operations on a Book DAO class com.example.bookstore.repository.JpaBookRepository. Don’t have the database wiring yet in this DAO class. Once we build the JUnit tests, we use JPA as a persistence layer. We also use H2 as a inmemory database for testing purpose. Create Book POJO class Create the JUnit test as below, public class JpaBookRepositoryTest { @Test public void testFindById() { Book book = bookRepository.findById(this.book.getId()); assertEquals(this.book.getAuthor(), book.getAuthor()); assertEquals(this.book.getDescription(), book.getDescription()); assertEquals(this.book.getIsbn(), book.getIsbn()); } @Test public void testFindByCategory() { List books = bookRepository.findByCategory(category); assertEquals(1, books.size()); for (Book book : books) { assertEquals(this.book.getCategory().getId(), category.getId()); assertEquals(this.book.getAuthor(), book.getAuthor()); assertEquals(this.book.getDescription(), book.getDescription()); assertEquals(this.book.getIsbn(), book.getIsbn()); } } @Test @Rollback(true) public void testStoreBook() { Book book = new BookBuilder() { { description("Something"); author("JohnDoe"); title("John Doe's life"); isbn("1234567890123"); category(category); } }.build(); bookRepository.storeBook(book); Book book1 = bookRepository.findById(book.getId()); assertEquals(book1.getAuthor(), book.getAuthor()); assertEquals(book1.getDescription(), book.getDescription()); assertEquals(book1.getIsbn(), book.getIsbn()); } } If you notice since the JpaBookRepository is only a skeleton class without implementation, all the tests will fail. As a next step, we need to create a Configuration and wire a datasource, and for the test purpose we will be using H2 database. And we also need to wire this back to JUnit test as below, @Configuration public class InfrastructureContextConfiguration { @Autowired private DataSource dataSource; //some more configurations.. @Bean public DataSource dataSource() { EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder(); builder.setType(EmbeddedDatabaseType.H2); return builder.build(); } } //JUnit test wiring is as below @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = { InfrastructureContextConfiguration.class, TestDataContextConfiguration.class }) @Transactional public class JpaBookRepositoryTest { //the test methods } Next step is to setup and teardown sample data in the JUnit test case as below, public class JpaBookRepositoryTest { @PersistenceContext private EntityManager entityManager; private Book book; private Category category; @Before public void setupData() { EntityBuilderManager.setEntityManager(entityManager); category = new CategoryBuilder() { { name("Evolution"); } }.build(); book = new BookBuilder() { { description("Richard Dawkins' brilliant reformulation of the theory of natural selection"); author("Richard Dawkins"); title("The Selfish Gene: 30th Anniversary Edition"); isbn("9780199291151"); category(category); } }.build(); } @After public void tearDown() { EntityBuilderManager.clearEntityManager(); } } Once we do the wiring, we need to implement the com.example.bookstore.repository.JpaBookRepository and use JPA to do the CRUD on the database and run the tests. The tests will succeed. Finally if you run Cobertura for this example from STS, we will get over 90% of line coverage for com.example.bookstore.repository.JpaBookRepository. In case you want to try few exercises you can implement repository for Account and User. I hope this blog helped you. In my next blog I will talk about Mochito and Implementing the Service layer.
March 1, 2013
by Krishna Prasad
· 80,265 Views
article thumbnail
MySQL optimizer: ANALYZE TABLE and Waiting for table flush
This post comes from Miguel Angel Nieto at the MySQL Performance Blog. The MySQL optimizer makes the decision of what execution plan to use based on the information provided by the storage engines. That information is not accurate in some engines like InnoDB and they are based in statistics calculations therefore sometimes some tune is needed. In InnoDB these statistics are calculated automatically, check the following blog post for more information: http://www.mysqlperformanceblog.com/2011/10/06/when-does-innodb-update-table-statistics-and-when-it-can-bite/ There are some variables to tune how that statistics are calculated but we need to wait until the gathering process triggers again to see if there is any improvement. Usually the first step to try to get back to the previous execution plan is to force that process with ANALYZE TABLE that is usually fast enough to not cause troubles. Let’s see an example of how a simple and fast ANALYZE can cause a downtime. Waiting for table flush: In order to trigger this problem we need: - Lot of concurrency - A long running query - Run an ANALYZE TABLE on a table accessed by the long running query So first we need a long running query against table t: SELECT * FROM t WHERE c > '%c%'; Then in our efforts to get a better execution plan for another query we run ANALYZE TABLE: mysql> analyze table t; +--------+---------+----------+----------+ | Table | Op | Msg_type | Msg_text | +--------+---------+----------+----------+ | test.t | analyze | status | OK | +--------+---------+----------+----------+ 1 row in set (0.00 sec) Perfect, very fast! But then some seconds later we realize that our application is down. Let’s see the process list. I’ve removed most of the columns to make it clearer: +------+-------------------------+---------------------------------+ | Time | State | Info | | 32 | Writing to net | select * from t where c > '%0%' | | 12 | Waiting for table flush | select * from test.t where i=1 | | 12 | Waiting for table flush | select * from test.t where i=2 | | 12 | Waiting for table flush | select * from test.t where i=3 | | 11 | Waiting for table flush | select * from test.t where i=7 | | 10 | Waiting for table flush | select * from test.t where i=11 | | 11 | Waiting for table flush | select * from test.t where i=5 | | 11 | Waiting for table flush | select * from test.t where i=4 | | 11 | Waiting for table flush | select * from test.t where i=9 | | 11 | Waiting for table flush | select * from test.t where i=8 | | 11 | Waiting for table flush | select * from test.t where i=12 | | 11 | Waiting for table flush | select * from test.t where i=14 | | 10 | Waiting for table flush | select * from test.t where i=6 | | 10 | Waiting for table flush | select * from test.t where i=15 | | 10 | Waiting for table flush | select * from test.t where i=10 | [...] The ANALYZE TABLE runs perfect but after it the rest of the threads that are running a query against that table need to wait. This is because MySQL has detected that the underlying table has changed and it needs to close and reopen it using FLUSH. Therefore the table will be locked until all queries that are using that table finish. There are only two solutions to this situation, wait until the long query finishes or kill the query. Also, we have to take in account that killing a query could cause even more troubles. If we are dealing with a write query on InnoDB the rollback process could take even more time to finish than the original query. On the other hand, if the table is MyISAM there will be no rollback process so all the already updated rows can’t be recovered. This particular example is not only a problem of ANALYZE. Other commands like FLUSH TABLES, ALTER, RENAME, OPTIMIZE or REPAIR can cause threads to wait on “Waiting for tables”, “Waiting for table” and “Waiting for table flush”. Conclusion Before running an ANALYZE table or any other command listed before, check the running queries. If the table that you are going to work on is very used the recommendation is to run it during the low peak of load or a maintenance window.
February 28, 2013
by Peter Zaitsev
· 12,062 Views
article thumbnail
Neo4j/Cypher: Combining COUNT and COLLECT in One Query
Curator's Note: Check raw code snippets to see properly formatted tables. In my continued playing around with football data I wanted to write a cypher query against neo4j which would show me which teams had missed the most penalties this season and who missed them. I started off with a query that returned all the penalties that have been missed this season and the games those missed happened in: START player = node:players('name:*') MATCH player-[:missed_penalty_in]-game, player-[:played|subbed_on]-stats-[:in]-game, stats-[:for]-team, game-[:home_team]-home, game-[:away_team]-away RETURN player.name, team.name, home.name, away.name +-------------------------------------------------------------------------------------------------+ | player.name | team.name | home.name | away.name | +-------------------------------------------------------------------------------------------------+ | "Papiss Demba Cisse" | "Newcastle United" | "Newcastle United" | "Norwich City" | | "Wayne Rooney" | "Manchester United" | "Manchester United" | "Arsenal" | | "Mikel Arteta" | "Arsenal" | "Arsenal" | "Fulham" | | "David Silva" | "Manchester City" | "Manchester City" | "Southampton" | | "Frank Lampard" | "Chelsea" | "Manchester City" | "Chelsea" | | "Adel Taarabt" | "Queens Park Rangers" | "Queens Park Rangers" | "Norwich City" | | "Javier Hernández" | "Manchester United" | "Manchester United" | "Wigan Athletic" | | "Robin Van Persie" | "Manchester United" | "Southampton" | "Manchester United" | | "Jonathan Walters" | "Stoke City" | "Fulham" | "Stoke City" | | "Shane Long" | "West Bromwich Albion" | "West Bromwich Albion" | "Liverpool" | | "Steven Gerrard" | "Liverpool" | "Liverpool" | "West Bromwich Albion" | | "Lucas Piazon" | "Chelsea" | "Chelsea" | "Aston Villa" | +-------------------------------------------------------------------------------------------------+ 12 rows (there should actually be another penalty miss for Jonathan Walters against Chelsea but for some reason the data source has missed it off! I then grouped the penalty misses by team so that I’d have one row for each team and a collection showing the people who’d missed. We can use the COLLECT function to do the latter: START player = node:players('name:*') MATCH player-[:missed_penalty_in]-game, player-[:played|subbed_on]-stats-[:in]-game, stats-[:for]-team RETURN DISTINCT team.name, COLLECT(player.name) AS players I wanted to order the teams by the number of penalties they’d missed so Manchester United would be first in the table in this case and initially tried to order the results by a count of players: START player = node:players('name:*') MATCH player-[:missed_penalty_in]-game, player-[:played|subbed_on]-stats-[:in]-game, stats-[:for]-team RETURN DISTINCT team.name, COLLECT(player.name) AS players ORDER BY COUNT(player.name) which doesn’t actually compile: SyntaxException: Aggregation expressions must be listed in the RETURN clause to be used in ORDER BY I tried a few other variations such as the following: START player = node:players('name:*') MATCH player-[:missed_penalty_in]-game, player-[:played|subbed_on]-stats-[:in]-game, stats-[:for]-team RETURN DISTINCT team.name, COUNT(player.name) AS numberOfPlayers, COLLECT(player.name) AS players ORDER BY numberOfPlayers DESC which again doesn’t compile: SyntaxException: Aggregation expressions must be listed in the RETURN clause to be used in ORDER BY I eventually found a post by Andres where he explains that you need to split the query into two and make use of WITH if you want to make use of two aggregation expressions. I ended up with the following query which does the job: START player = node:players('name:*') MATCH player-[:missed_penalty_in]-game, player-[:played|subbed_on]-stats-[:in]-game, stats-[:for]-team WITH DISTINCT team, COLLECT(player.name) AS players MATCH player-[:missed_penalty_in]-game, player-[:played|subbed_on]-stats-[:in]-game, stats-[:for]-team WITH DISTINCT team, COUNT(player) AS numberOfPlayers, players RETURN team.name, players ORDER BY numberOfPlayers DESC +---------------------------------------------------------------------------------+ | team.name | players | +---------------------------------------------------------------------------------+ | "Manchester United" | ["Wayne Rooney","Javier Hernández","Robin Van Persie"] | | "Chelsea" | ["Frank Lampard","Lucas Piazon"] | | "Liverpool" | ["Steven Gerrard"] | | "Manchester City" | ["David Silva"] | | "Newcastle United" | ["Papiss Demba Cisse"] | | "Queens Park Rangers" | ["Adel Taarabt"] | | "Stoke City" | ["Jonathan Walters"] | | "Arsenal" | ["Mikel Arteta"] | | "West Bromwich Albion" | ["Shane Long"] | +---------------------------------------------------------------------------------+ 9 rows
February 27, 2013
by Mark Needham
· 11,013 Views
article thumbnail
Understanding TCP/IP Network Stack & Writing Network Apps
We cannot imagine Internet service without TCP/IP. All Internet services we have developed and used at NHN are based on a solid basis, TCP/IP. Understanding how data is transferred via the network will help you to improve performance through tuning, troubleshooting, or introduction to a new technology. This article will describe the overall operation scheme of the network stack based on data flow and control flow in Linux OS and the hardware layer. Key Characteristics of TCP/IP How should I design a network protocol to transmit data quickly while keeping the data order without any data loss? TCP/IP has been designed with this consideration. The following are the key characteristics of TCP/IP required to understand the concept of the stack. TCP and IP Technically, since TCP and IP have different layer structures, it would be correct to describe them separately. However, here we will describe them as one. 1. Connection-oriented First, a connection is made between two endpoints (local and remote) and then data is transferred. Here, the "TCP connection identifier" is a combination of addresses of the two endpoints, having type. 2. Bidirectional Byte Stream Bidirectional data communication is made by using byte stream. 3. In-order Delivery A receiver receives data in the order of sending data from a sender. For that, the order of data is required. To mark the order, 32-bit integer data type is used. 4. Reliability through ACK When a sender did not receive ACK (acknowledgement) from a receiver after sending data to the receiver, the sender TCP re-sends the data to the receiver. Therefore, the sender TCP buffers unacknowledged data from the receiver. 5. Flow Control A sender sends as much data as a receiver can afford. A receiver sends the maximum number of bytes that it can receive (unused buffer size, receive window) to the sender. The sender sends as much data as the size of bytes that the receiver's receive window allows. 6. Congestion Control The congestion window is used separately from the receive window to prevent network congestion by limiting the volume of data flowing in the network. Like the receive window, the sender sends as much data as the size of bytes that the receiver's congestion window allows by using a variety of algorithms such as TCP Vegas, Westwood, BIC, and CUBIC. Different from flow control, congestion control is implemented by the sender only. Data Transmission As indicated by its name, a network stack has many layers. The following Figure 1 shows the layer types. Figure 1: Operation Process by Each Layer of TCP/IP Network Stack for Data Transmission. There are several layers and the layers are briefly classified into three areas: User area Kernel area Device area Tasks at the user area and the kernel area are performed by the CPU. The user area and the kernel area are called "host" to distinguish them from the device area. Here, the device is the Network Interface Card (NIC) that sends and receives packets. It is a more accurate term than the commonly used "LAN card". Let's take a look at the user area. First, the application creates data to send (the "User data" box in Figure 1) and then calls the write() system call to send the data. Assume that the socket (fd in Figure 1) has been already created. When the system call is called, the area is switched to the kernel area. POSIX-series operating systems including Linux and Unix expose the socket to the application by using a file descriptor. In the POSIX-series operating system, the socket is a kind of a file. The file layer executes a simple examination and calls the socket function by using the socket structure connected to the file structure. The kernel socket has two buffers. One is the send socket buffer for sending; And the other is the receive socket buffer for receiving. When the write system call is called, the data in the user area is copied to the kernel memory and then added to the end of the send socket buffer. This is to send data in order. In the Figure 1, the light-gray box refers to the data in the socket buffer. Then, TCP is called. There is the TCP Control Block (TCB) structure connected to the socket. The TCB includes data required for processing the TCP connection. Data in the TCB are connection state (LISTEN, ESTABLISHED, TIME_WAIT),receive window, congestion window, sequence number, resending timer, etc. If the current TCP state allows for data transmission, a new TCP segment (in other words, a packet) is created. If data transmission is impossible due to flow control or such a reason, the system call is ended here and then the mode is returned to the user mode (in other words, the control is passed to the application). There are two TCP segments as shown in Figure 2: TCP header; And payload. Figure 2: TCP Frame Structure (source). The payload includes the data saved in the unacknowledged send socket buffer. The maximum length of the payload is the maximum value among the receive window, congestion window, and maximum segment size (MSS). Then, TCP checksum is computed. In this checksum computation, pseudo header information (IP addresses, segment length, and protocol number) is included. One or more packets can be transmitted according to the TCP state. In fact, since the current network stack uses the checksum offload, the TCP checksum is computed by NIC, not by the kernel. However, we assume that the kernel computes the TCP checksum for convenience. The created TCP segment goes down to the IP layer. The IP layer adds an IP header to the TCP segment and performs IP routing. IP routing is a procedure of searching the next hop IP in order to go to the destination IP. After the IP layer has computed and added the IP header checksum, it sends the data to the Ethernet layer. The Ethernet layer searches for the MAC address of the next hop IP by using the Address Resolution Protocol (ARP). It then adds the Ethernet header to the packet. The host packet is completed by adding the Ethernet header. After IP routing is performed, the transmit interface (NIC) is known as the result of IP routing. The interface is used for transmitting a packet to the next hop IP and the IP. Therefore, the transmit NIC driver is called. At this time, if a packet capture program such as tcpdump or Wireshark is running, the kernel copies the packet data onto the memory buffer that the program uses. In that way, the receiving packet is directly captured on the driver. Generally, the traffic shaper function is implemented to run on this layer. The driver requests packet transmission according to the driver-NIC communication protocol defined by the NIC manufacturer. After receiving the packet transmission request, the NIC copies the packets from the main memory to its memory and then sends it to the network line. At this time, by complying with the Ethernet standard, it adds the IFG (Inter-Frame Gap), preamble, and CRC to the packet. The IFG and preamble are used to distinguish the start of the packet (as a networking term, framing), and the CRC is used to protect the data (the same purpose as TCP and IP checksum). Packet transmission is started based on the physical speed of the Ethernet and the condition of Ethernet flow control. It is like getting the floor and speaking in a conference room. When an NIC sends a packet, the NIC generates interrupts on the host CPU. Every interrupt has its own interrupt number and the OS searches an adequate driver to handle the interrupt by using the number. The driver registers a function to handle the interrupt (an interrupt handler) when the driver is started. The OS calls the interrupt handler and then the interrupt handler returns the transmitted packet to the OS. So far we have discussed the procedure of data transmission through the kernel and the device when an application performs write. However, without a direct write request from the application, the kernel can transmit a packet by directly calling TCP. For example, when an ACK is received and the receive window is expanded, the kernel creates a TCP segment including the data left in the socket buffer and sends the TCP segment to the receiver. Data Receiving Now, let's take a look at how data is received. Data receiving is a procedure for how the network stack handles a packet coming in. Figure 3 shows how the network stack handles a packet received. Figure 3: Operation Process by Each Layer of TCP/IP Network Stack for Handling Data Received. First, the NIC writes the packet onto its memory. It checks whether the packet is valid by performing the CRC check and then sends the packet to the memory buffer of the host. This buffer is a memory that has already been requested by the driver to the kernel and allocated for receiving packets. After the buffer has been allocated, the driver tells the memory address and size to the NIC. When there is no host memory buffer allocated by the driver even though the NIC receives a packet, the NIC may drop the packet. After sending the packet to the host memory buffer, the NIC sends an interrupt to the host OS. Then, the driver checks whether it can handle the new packet or not. So far, the driver-NIC communication protocol defined by the manufacturer is used. When the driver should send a packet to the upper layer, the packet must be wrapped in a packet structure that the OS uses for the OS to understand the packet. For example, sk_buff of Linux, mbuf of BSD-series kernel, and NET_BUFFER_LIST of Microsoft Windows are the packet structures of the corresponding OS. The driver sends the wrapped packets to the upper layer. The Ethernet layer checks whether the packet is valid and then de-multiplexes the upper protocol (network protocol). At this time, it uses the ethertype value of the Ethernet header. The IPv4 ethertype value is 0x0800. It removes the Ethernet header and then sends the packet to the IP layer. The IP layer also checks whether the packet is valid. In other words, it checks the IP header checksum. It logically determines whether it should perform IP routing and make the local system handle the packet, or send the packet to the other system. If the packet must be handled by the local system, the IP layer de-multiplexes the upper protocol (transport protocol) by referring to the proto value of the IP header. The TCP proto value is 6. It removes the IP header and then sends the packet to the TCP layer. Like the lower layer, the TCP layer checks whether the packet is valid. It also checks the TCP checksum. As mentioned before, since the current network stack uses the checksum offload, the TCP checksum is computed by NIC, not by the kernel. Then it searches the TCP control block where the packet is connected. At this time, of the packet is used as an identifier. After searching the connection, it performs the protocol to handle the packet. If it has received new data, it adds the data to the receive socket buffer. According to the TCP state, it can send a new TCP packet (for example, an ACK packet). Now TCP/IP receiving packet handling has completed. The size of the receive socket buffer is the TCP receive window. To a certain point, the TCP throughput increases when the receive window is large. In the past, the socket buffer size had been adjusted on the application or the OS configuration. The latest network stack has a function to adjust the receive socket buffer size, i.e., the receive window, automatically. When the application calls the read system call, the area is changed to the kernel area and the data in the socket buffer is copied to the memory in the user area. The copied data is removed from the socket buffer. And then the TCP is called. The TCP increases the receive window because there is new space in the socket buffer. And it sends a packet according to the protocol status. If no packet is transferred, the system call is terminated. Network Stack Development Direction The functions of network stack layers described so far are the most basic functions. The network stack in the early 1990s had few more functions than the functions described above. However, the latest network stack has many more functions and complexity as the network stack implementation structure gets higher. The latest network stack is classified by purpose as follows. Packet Processing Procedure Manipulation It is a function like Netfilter (firewall, NAT) and traffic control. By inserting the user-controllable code to the basic processing flow, the function can work differently according to the user configuration. Protocol Performance It aims to improve the throughput, latency, and stability that the TCP protocol can achieve within the given network environment. Various congestion control algorithms and additional TCP functions such as SACK are the typical examples. The protocol improvement will not be discussed here since it is out of the scope. Packet Processing Efficiency The packet processing efficiency aims to improve the maximum number of packets that can be processed per second by reducing the CPU cycle, memory usage, and memory accesses that one system consumes to process packets. There have been several attempts to reduce the latency in the system. The attempts include stack parallel processing, header prediction, zero-copy, single-copy, checksum offload, TSO, LRO, RSS, etc. Control Flow in the Stack Now, we will take a more detailed look at the internal flow of the Linux network stack. Like a subsystem which is not a network stack, a network stack basically runs as the event-driven way that reacts when the event occurs. Therefore, there is no separated thread to execute the stack. Figure 1 and Figure 3 showed the simplified diagrams of control flow. Figure 4 below illustrates more exact control flow. Figure 4: Control Flow in the Stack. At Flow (1) in Figure 4, an application calls a system call to execute (use) the TCP. For example, calls the read system call and the write system call and then executes TCP. However, there is no packet transmission. Flow (2) is same as Flow (1) if it requires packet transmission after executing TCP. It creates a packet and sends down the packet to the driver. A queue is in front of the driver. The packet comes into the queue first, and then the queue implementation structure decides the time to send the packet to the driver. This is queue discipline (qdisc) of Linux. The function of Linux traffic control is to manipulate the qdisc. The default qdisc is a simple First-In-First-Out (FIFO) queue. By using another qdisc, operators can achieve various effects such as artificial packet loss, packet delay, transmission rate limit, etc. At Flow (1) and Flow (2), the process thread of the application also executes the driver. Flow (3) shows the case in which the timer used by the TCP has expired. For example, when the TIME_WAITtimer has expired, the TCP is called to delete the connection. Like Flow (3), Flow (4) is the case in which the timer used by the TCP has expired and the TCP execution result packet should be transmitted. For example, when the retransmit timer has expired, the packet of which ACK has not been received is transmitted. Flow (3) and Flow (4) show the procedure of executing the timer softirq that has processed the timer interrupt. When the NIC driver receives an interrupt, it frees the transmitted packet. In most cases, execution of the driver is terminated here. Flow (5) is the case of packet accumulation in the transmit queue. The driver requests softirq and the softirq handler executes the transmit queue to send the accumulated packet to the driver. When the NIC driver receives an interrupt and finds a newly received packet, it requests softirq. The softirq that processes the received packet calls the driver and transmits the received packet to the upper layer. In Linux, processing the received packet as shown above is called New API (NAPI). It is similar to polling because the driver does not directly transmit the packet to the upper layer, but the upper layer directly gets the packet. The actual code is called NAPI poll or poll. Flow (6) shows the case that completes execution of TCP, and Flow (7) shows the case that requires additional packet transmission. All of Flow (5), (6), and (7) are executed by the softirq which has processed the NIC interrupt. How to Process Interrupt and Received Packet Interrupt processing is complex; however, you need to understand the performance issue related to processing of packets received. Figure 5 shows the procedure of processing an interrupt. Figure 5: Processing Interrupt, softirq, and Received Packet. Assume that the CPU 0 is executing an application program (user program). At this time, the NIC receives a packet and generates an interrupt for the CPU 0. Then the CPU executes the kernel interrupt (called irq) handler. This handler refers to the interrupt number and then calls the driver interrupt handler. The driver frees the packet transmitted and then calls the napi_schedule() function to process the received packet. This function requests the softirq (software interrupt). After execution of the driver interrupt handler has been terminated, the control is passed to the kernel handler. The kernel handler executes the interrupt handler for the softirq. After the interrupt context has been executed, the softirq context will be executed. The interrupt context and the softirq context are executed by an identical thread. However, they use different stacks. And, the interrupt context blocks hardware interrupts; however, the softirq context allows for hardware interrupts. The softirq handler that processes the received packet is the net_rx_action() function. This function calls thepoll() function of the driver. The poll() function calls the netif_receive_skb() function and then sends the received packets one by one to the upper layer. After processing the softirq, the application restarts execution from the stopped point in order to request a system call. Therefore, the CPU that has received the interrupt processes the received packets from the first to the last. In Linux, BSD, and Microsoft Windows, the processing procedure is basically the same on this wise. When you check the server CPU utilization, sometimes you can check that only one CPU executes the softirq hard among the server CPUs. The phenomenon occurs due to the way of processing received packets explained so far. To solve the problem, multi-queue NIC, RSS, and RPS have been developed. Data Structure The followings are some key data structures. Take a look at them and review the code. sk_buff structure First, there is the sk_buff structure or skb structure that means a packet. Figure 6 shows some of the sk_buffstructure. As the functions have been advanced, they get more complicated. However, the basic functions are very common that anyone can think. Figure 6: Packet Structure sk_buff. Including Packet Data and meta data The structure directly includes the packet data or refers to it by using a pointer. In Figure 6, some of the packets (from Ethernet to buffer) refer to using the data pointer and the additional data (frags) refer to the actual page. The necessary information such as header and payload length is saved in the meta data area. For example, inFigure 6, the mac_header, the network_header, and the transport_header have the corresponding pointer data that points the starting position of the Ethernet header, IP header and TCP header, respectively. This way makes TCP protocol processing easy. How to Add or Delete a Header The header is added or deleted as up and down each layer of the network stack. Pointers are used for more efficient processing. For example, to remove the Ethernet header, just increase the head pointer. How to Combine and Divide Packet The linked list is used for efficient execution of tasks such as adding or deleting packet payload data to the socket buffer, or packet chain. The next pointer and the prev pointer are used for this purpose. Quick Allocation and Free As a structure is allocated whenever creating a packet, the quick allocator is used. For example, if data is transmitted at the speed of 10-Gigabit Ethernet, more than one million packets per second must be created and deleted. TCP Control Block Second, there is a structure that represents the TCP connection. Previously, it was abstractly called a TCP control block. Linux uses tcp_sock for the structure. In Figure 7, you can see the relationship among the file, the socket, and the tcp_sock. Figure 7: TCP Connection Structure. When a system call has occurred, it searches the file in the file descriptor used by the application that has called the system call. For the Unix-series OS, the socket, the file and the device for general file system for storage are abstracted to a file. Therefore, the file structure includes the least information. For a socket, a separate socket structure saves the socket-related information and the file refers to the socket as a pointer. The socket refers to the tcp_sock again. The tcp_sock is classified into sock, inet_sock, etc to support various protocols except TCP. It may be considered as a kind of polymorphism. All status information used by the TCP protocol is saved in the tcp_sock. For example, the sequence number, receive window, congestion control, and retransmit timer are saved in the tcp_sock. The send socket buffer and the receive socket buffer are the sk_buff lists and they include the tcp_sock. The dst_entry, the IP routing result, is referred to in order to avoid too frequent routing. The dst_entry allows for easy search of the ARP result, i.e., the destination MAC address. The dst_entry is part of the routing table. The structure of the routing table is very complex that it will not be discussed in this document. The NIC to be used for packet transmission is searched by using the dst_entry. The NIC is expressed as the net_device structure. Therefore, by searching just the file, it is very easy to find all structures (from the file to the driver) required to process the TCP connection with the pointer. The size of the structures is the memory size used by one TCP connection. The memory size is a few KBs (excluding the packet data). As more functions have been added, the memory usage has been gradually increased. Finally, let's see the TCP connection lookup table. It is a hash table used to search the TCP connection where the received packet belongs. The hash value is calculated by using the input data of of the packet and the Jenkins hash algorithm. It is told that the hash function has been selected by considering defense against attacks to the hash table. Following Code: How to Transmit Data We will check the key tasks performed by the stack by following the actual Linux kernel source code. Here, we will observe two paths which are frequently used. First, this is a path used to transmit data when an application calls the write system call. SYSCALL_DEFINE3(write, unsigned int, fd, const char __user *, buf, ...) { struct file *file; [...] file = fget_light(fd, &fput_needed); [...] ===> ret = filp->f_op->aio_write(&kiocb, &iov, 1, kiocb.ki_pos); struct file_operations { [...] ssize_t (*aio_read) (struct kiocb *, const struct iovec *, ...) ssize_t (*aio_write) (struct kiocb *, const struct iovec *, ...) [...] }; static const struct file_operations socket_file_ops = { [...] .aio_read = sock_aio_read, .aio_write = sock_aio_write, [...] }; When the application calls the write system call, the kernel performs the write() function of the file layer. First, the actual file structure of the file descriptor fd is fetched. And then the aio_write is called. This is the function pointer. In the file structure, you will see the file_operations structure pointer. The structure is generally called function table and includes the function pointers such as aio_read and aio_write. The actual table for the socket is socket_file_ops. The aio_write function used by the socket is sock_aio_write. The function table is used for the purpose that is similar to the Java interface. It is generally used for the kernel to perform code abstraction or refactoring. static ssize_t sock_aio_write(struct kiocb *iocb, const struct iovec *iov, ..) { [...] struct socket *sock = file->private_data; [...] ===> return sock->ops->sendmsg(iocb, sock, msg, size); struct socket { [...] struct file *file; struct sock *sk; const struct proto_ops *ops; }; const struct proto_ops inet_stream_ops = { .family = PF_INET, [...] .connect = inet_stream_connect, .accept = inet_accept, .listen = inet_listen, .sendmsg = tcp_sendmsg, .recvmsg = inet_recvmsg, [...] }; struct proto_ops { [...] int (*connect) (struct socket *sock, ...) int (*accept) (struct socket *sock, ...) int (*listen) (struct socket *sock, int len); int (*sendmsg) (struct kiocb *iocb, struct socket *sock, ...) int (*recvmsg) (struct kiocb *iocb, struct socket *sock, ...) [...] }; The sock_aio_write() function gets the socket structure from the file and then calls sendmsg. It is also the function pointer. The socket structure includes the proto_ops function table. The proto_ops implemented by the IPv4 TCP is inet_stream_ops and the sendmsg is implemented by tcp_sendmsg. int tcp_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t size) { struct sock *sk = sock->sk; struct iovec *iov; struct tcp_sock *tp = tcp_sk(sk); struct sk_buff *skb; [...] mss_now = tcp_send_mss(sk, &size_goal, flags); /* Ok commence sending. */ iovlen = msg->msg_iovlen; iov = msg->msg_iov; copied = 0; [...] while (--iovlen >= 0) { int seglen = iov->iov_len; unsigned char __user *from = iov->iov_base; iov++; while (seglen > 0) { int copy = 0; int max = size_goal; [...] skb = sk_stream_alloc_skb(sk, select_size(sk, sg), sk->sk_allocation); if (!skb) goto wait_for_memory; /* * Check whether we can use HW checksum. */ if (sk->sk_route_caps & NETIF_F_ALL_CSUM) skb->ip_summed = CHECKSUM_PARTIAL; [...] skb_entail(sk, skb); [...] /* Where to copy to? */ if (skb_tailroom(skb) > 0) { /* We have some space in skb head. Superb! */ if (copy > skb_tailroom(skb)) copy = skb_tailroom(skb); if ((err = skb_add_data(skb, from, copy)) != 0) goto do_fault; [...] if (copied) tcp_push(sk, flags, mss_now, tp->nonagle); [...] } tcp_sengmsg gets tcp_sock (i.e.,TCP control block) from the socket and copies the data that the application has requested to transmit to the send socket buffer. When copying data to sk_buff, how many bytes will one sk_buff include? One sk_buff copies and includes MSS (tcp_send_mss) bytes to help the code that actually creates packets. Maximum Segment Size (MSS) stands for the maximum payload size that one TCP packet includes. By using TSO and GSO, one sk_buff can save more data than MSS. This will be discussed later, not in this document. The sk_stream_alloc_skb function creates a new sk_buff, and skb_entail adds the new sk_buff to the tail of the send_socket_buffer. The skb_add_data function copies the actual application data to the data buffer of thesk_buff. All the data is copied by repeating the procedure (creating an sk_buff and adding it to the send socket buffer) several times. Therefore, sk_buffs at the size of the MSS are in the send socket buffer as a list. Finally, the tcp_push is called to make the data which can be transmitted now as a packet, and the packet is sent. static inline void tcp_push(struct sock *sk, int flags, int mss_now, ...) [...] ===> static int tcp_write_xmit(struct sock *sk, unsigned int mss_now, ...) int nonagle, { struct tcp_sock *tp = tcp_sk(sk); struct sk_buff *skb; [...] while ((skb = tcp_send_head(sk))) { [...] cwnd_quota = tcp_cwnd_test(tp, skb); if (!cwnd_quota) break; if (unlikely(!tcp_snd_wnd_test(tp, skb, mss_now))) break; [...] if (unlikely(tcp_transmit_skb(sk, skb, 1, gfp))) break; /* Advance the send_head. This one is sent out. * This call will increment packets_out. */ tcp_event_new_data_sent(sk, skb); [...] The tcp_push function transmits as many of the sk_buffs in the send socket buffer as the TCP allows in sequence. First, the tcp_send_head is called to get the first sk_buff in the socket buffer and thetcp_cwnd_test and the tcp_snd_wnd_test are performed to check whether the congestion window and the receive window of the receiving TCP allow new packets to be transmitted. Then, the tcp_transmit_skb function is called to create a packet. static int tcp_transmit_skb(struct sock *sk, struct sk_buff *skb, int clone_it, gfp_t gfp_mask) { const struct inet_connection_sock *icsk = inet_csk(sk); struct inet_sock *inet; struct tcp_sock *tp; [...] if (likely(clone_it)) { if (unlikely(skb_cloned(skb))) skb = pskb_copy(skb, gfp_mask); else skb = skb_clone(skb, gfp_mask); if (unlikely(!skb)) return -ENOBUFS; } [...] skb_push(skb, tcp_header_size); skb_reset_transport_header(skb); skb_set_owner_w(skb, sk); /* Build TCP header and checksum it. */ th = tcp_hdr(skb); th->source = inet->inet_sport; th->dest = inet->inet_dport; th->seq = htonl(tcb->seq); th->ack_seq = htonl(tp->rcv_nxt); [...] icsk->icsk_af_ops->send_check(sk, skb); [...] err = icsk->icsk_af_ops->queue_xmit(skb); if (likely(err <= 0)) return err; tcp_enter_cwr(sk, 1); return net_xmit_eval(err); } tcp_transmit_skb creates the copy of the given sk_buff (pskb_copy). At this time, it does not copy the entire data of the application but the metadata. And then it calls skb_push to secure the header area and records the header field value. Send_check computes the TCP checksum. With the checksum offload, the payload data is not computed. Finally, queue_xmit is called to send the packet to the IP layer. Queue_xmit for IPv4 is implemented by the ip_queue_xmit function. int ip_queue_xmit(struct sk_buff *skb) [...] rt = (struct rtable *)__sk_dst_check(sk, 0); [...] /* OK, we know where to send it, allocate and build IP header. */ skb_push(skb, sizeof(struct iphdr) + (opt ? opt->optlen : 0)); skb_reset_network_header(skb); iph = ip_hdr(skb); *((__be16 *)iph) = htons((4 << 12) | (5 << 8) | (inet->tos & 0xff)); if (ip_dont_fragment(sk, &rt->dst) && !skb->local_df) iph->frag_off = htons(IP_DF); else iph->frag_off = 0; iph->ttl = ip_select_ttl(inet, &rt->dst); iph->protocol = sk->sk_protocol; iph->saddr = rt->rt_src; iph->daddr = rt->rt_dst; [...] res = ip_local_out(skb); [...] ===> int __ip_local_out(struct sk_buff *skb) [...] ip_send_check(iph); return nf_hook(NFPROTO_IPV4, NF_INET_LOCAL_OUT, skb, NULL, skb_dst(skb)->dev, dst_output); [...] ===> int ip_output(struct sk_buff *skb) { struct net_device *dev = skb_dst(skb)->dev; [...] skb->dev = dev; skb->protocol = htons(ETH_P_IP); return NF_HOOK_COND(NFPROTO_IPV4, NF_INET_POST_ROUTING, skb, NULL, dev, ip_finish_output, [...] ===> static int ip_finish_output(struct sk_buff *skb) [...] if (skb->len > ip_skb_dst_mtu(skb) && !skb_is_gso(skb)) return ip_fragment(skb, ip_finish_output2); else return ip_finish_output2(skb); The ip_queue_xmit function executes tasks required by the IP layers. __sk_dst_check checks whether the cached route is valid. If there is no cached route or the cached route is invalid, it performs IP routing. And then it calls skb_push to secure the IP header area and records the IP header field value. After that, as following the function call, ip_send_check computes the IP header checksum and calls the netfilter function. IP fragment is created when ip_finish_output function needs IP fragmentation. No fragmentation is generated when TCP is used. Therefore, ip_finish_output2 is called and it adds the Ethernet header. Finally, a packet is completed. int dev_queue_xmit(struct sk_buff *skb) [...] ===> static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, ...) [...] if (...) { .... } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) && qdisc_run_begin(q)) { [...] if (sch_direct_xmit(skb, q, dev, txq, root_lock)) { [...] ===> int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, ...) [...] HARD_TX_LOCK(dev, txq, smp_processor_id()); if (!netif_tx_queue_frozen_or_stopped(txq)) ret = dev_hard_start_xmit(skb, dev, txq); HARD_TX_UNLOCK(dev, txq); [...] } int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev, ...) [...] if (!list_empty(&ptype_all)) dev_queue_xmit_nit(skb, dev); [...] rc = ops->ndo_start_xmit(skb, dev); [...] } The completed packet is transmitted through the dev_queue_xmit function. First, the packet passes via the qdisc. If the default qdisc is used and the queue is empty, the sch_direct_xmit function is called to directly send down the packet to the driver, skipping the queue. Dev_hard_start_xmit function calls the actual driver. Before calling the driver, the device TX is locked first. This is to prevent several threads from accessing the device simultaneously. As the kernel locks the device TX, the driver transmission code does not need an additional lock. It is closely related to the parallel processing that will be discussed next time. Ndo_start_xmit function calls the driver code. Just before, you will see ptype_all and dev_queue_xmit_nit. The ptype_all is a list that includes the modules such as packet capture. If a capture program is running, the packet is copied by ptype_all to the separate program. Therefore, the packet that tcpdump shows is the packet transmitted to the driver. When checksum offload or TSO is used, the NIC manipulates the packet. So the tcpdump packet is different from the packet transmitted to the network line. After completing packet transmission, the driver interrupt handler returns the sk_buff. Following Code: How to Receive Data The general executed path is to receive a packet and then to add the data to the receive socket buffer. After executing the driver interrupt handler, follow the napi poll handle first. static void net_rx_action(struct softirq_action *h) { struct softnet_data *sd = &__get_cpu_var(softnet_data); unsigned long time_limit = jiffies + 2; int budget = netdev_budget; void *have; local_irq_disable(); while (!list_empty(&sd->poll_list)) { struct napi_struct *n; [...] n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list); if (test_bit(NAPI_STATE_SCHED, &n->state)) { work = n->poll(n, weight); trace_napi_poll(n); } [...] } int netif_receive_skb(struct sk_buff *skb) [...] ===> static int __netif_receive_skb(struct sk_buff *skb) { struct packet_type *ptype, *pt_prev; [...] __be16 type; [...] list_for_each_entry_rcu(ptype, &ptype_all, list) { if (!ptype->dev || ptype->dev == skb->dev) { if (pt_prev) ret = deliver_skb(skb, pt_prev, orig_dev); pt_prev = ptype; } } [...] type = skb->protocol; list_for_each_entry_rcu(ptype, &ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) { if (ptype->type == type && (ptype->dev == null_or_dev || ptype->dev == skb->dev || ptype->dev == orig_dev)) { if (pt_prev) ret = deliver_skb(skb, pt_prev, orig_dev); pt_prev = ptype; } } if (pt_prev) { ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev); static struct packet_type ip_packet_type __read_mostly = { .type = cpu_to_be16(ETH_P_IP), .func = ip_rcv, [...] }; As mentioned before, the net_rx_action function is the softirq handler that receives a packet. First, the driver that has requested the napi poll is retrieved from the poll_list and the poll handler of the driver is called. The driver wraps the received packet with sk_buff and then calls netif_receive_skb. When there is a module that requests all packets, the netif_receive_skb sends packets to the module. Like packet transmission, the packets are transmitted to the module registered to the ptype_all list. The packets are captured here. Then, the packets are transmitted to the upper layer based on the packet type. The Ethernet packet includes 2-byte ethertype field in the header. The value indicates the packet type. The driver records the value in sk_buff(skb->protocol). Each protocol has its own packet_type structure and registers the pointer of the structure to the ptype_base hash table. IPv4 uses ip_packet_type. The Type field value is the IPv4 ethertype (ETH_P_IP) value. Therefore, the IPv4 packet calls the ip_rcv function. int ip_rcv(struct sk_buff *skb, struct net_device *dev, ...) { struct iphdr *iph; u32 len; [...] iph = ip_hdr(skb); [...] if (iph->ihl < 5 || iph->version != 4) goto inhdr_error; if (!pskb_may_pull(skb, iph->ihl*4)) goto inhdr_error; iph = ip_hdr(skb); if (unlikely(ip_fast_csum((u8 *)iph, iph->ihl))) goto inhdr_error; len = ntohs(iph->tot_len); if (skb->len < len) { IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INTRUNCATEDPKTS); goto drop; } else if (len < (iph->ihl*4)) goto inhdr_error; [...] return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL, ip_rcv_finish); [...] ===> int ip_local_deliver(struct sk_buff *skb) [...] if (ip_hdr(skb)->frag_off & htons(IP_MF | IP_OFFSET)) { if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER)) return 0; } return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL, ip_local_deliver_finish); [...] ===> static int ip_local_deliver_finish(struct sk_buff *skb) [...] __skb_pull(skb, ip_hdrlen(skb)); [...] int protocol = ip_hdr(skb)->protocol; int hash, raw; const struct net_protocol *ipprot; [...] hash = protocol & (MAX_INET_PROTOS - 1); ipprot = rcu_dereference(inet_protos[hash]); if (ipprot != NULL) { [...] ret = ipprot->handler(skb); [...] ===> static const struct net_protocol tcp_protocol = { .handler = tcp_v4_rcv, [...] }; The ip_rcv function executes tasks required by the IP layers. It examines packets such as the length and header checksum. After passing through the netfilter code, it performs the ip_local_deliver function. If required, it assembles IP fragments. Then, it calls ip_local_deliver_finish through the netfilter code. Theip_local_deliver_finish function removes the IP header by using the __skb_pull and then searches the upper protocol whose value is identical to the IP header protocol value. Similar to the Ptype_base, each transport protocol registers its own net_protocol structure in inet_protos. IPv4 TCP uses tcp_protocol and callstcp_v4_rcv that has been registered as a handler. When packets come into the TCP layer, the packet processing flow varies depending on the TCP status and the packet type. Here, we will see the packet processing procedure when the expected next data packet has been received in the ESTABLISHED status of the TCP connection. This path is frequently executed by the server receiving data when there is no packet loss or out-of-order delivery. int tcp_v4_rcv(struct sk_buff *skb) { const struct iphdr *iph; struct tcphdr *th; struct sock *sk; [...] th = tcp_hdr(skb); if (th->doff < sizeof(struct tcphdr) / 4) goto bad_packet; if (!pskb_may_pull(skb, th->doff * 4)) goto discard_it; [...] th = tcp_hdr(skb); iph = ip_hdr(skb); TCP_SKB_CB(skb)->seq = ntohl(th->seq); TCP_SKB_CB(skb)->end_seq = (TCP_SKB_CB(skb)->seq + th->syn + th->fin + skb->len - th->doff * 4); TCP_SKB_CB(skb)->ack_seq = ntohl(th->ack_seq); TCP_SKB_CB(skb)->when = 0; TCP_SKB_CB(skb)->flags = iph->tos; TCP_SKB_CB(skb)->sacked = 0; sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest); [...] ret = tcp_v4_do_rcv(sk, skb); First, the tcp_v4_rcv function validates the received packets. When the header size is larger than the data offset (th->doff < sizeof(struct tcphdr) / 4), it is the header error. And then __inet_lookup_skb is called to look for the connection where the packet belongs from the TCP connection hash table. From the sock structure found, all required structures such as tcp_sock and socket can be got. int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb) [...] if (sk->sk_state == TCP_ESTABLISHED) { /* Fast path */ sock_rps_save_rxhash(sk, skb->rxhash); if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) { [...] ===> int tcp_rcv_established(struct sock *sk, struct sk_buff *skb, [...] /* * Header prediction. */ if ((tcp_flag_word(th) & TCP_HP_BITS) == tp->pred_flags && TCP_SKB_CB(skb)->seq == tp->rcv_nxt && !after(TCP_SKB_CB(skb)->ack_seq, tp->snd_nxt))) { [...] if ((int)skb->truesize > sk->sk_forward_alloc) goto step5; NET_INC_STATS_BH(sock_net(sk), LINUX_MIB_TCPHPHITS); /* Bulk data transfer: receiver */ __skb_pull(skb, tcp_header_len); __skb_queue_tail(&sk->sk_receive_queue, skb); skb_set_owner_r(skb, sk); tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq; [...] if (!copied_early || tp->rcv_nxt != tp->rcv_wup) __tcp_ack_snd_check(sk, 0); [...] step5: if (th->ack && tcp_ack(sk, skb, FLAG_SLOWPATH) < 0) goto discard; tcp_rcv_rtt_measure_ts(sk, skb); /* Process urgent data. */ tcp_urg(sk, skb, th); /* step 7: process the segment text */ tcp_data_queue(sk, skb); tcp_data_snd_check(sk); tcp_ack_snd_check(sk); return 0; [...] } The actual protocol is executed from the tcp_v4_do_rcv function. If the TCP is in the ESTABLISHED status,tcp_rcv_esablished is called. Processing of the ESTABLISHED status is separately handled and optimized since it is the most common status. The tcp_rcv_established first executes the header prediction code. The header prediction is also quickly processed to detect in the common state. The common case here is that there is no data to transmit and the received data packet is the packet that must be received next time, i.e., the sequence number is the sequence number that the receiving TCP expects. In this case, the procedure is completed by adding the data to the socket buffer and then transmitting ACK. Go forward and you will see the sentence comparing truesize with sk_forward_alloc. It is to check whether there is any free space in the receive socket buffer to add new packet data. If there is, header prediction is "hit" (prediction succeeded). Then __skb_pull is called to remove the TCP header. After that, __skb_queue_tail is called to add the packet to the receive socket buffer. Finally, __tcp_ack_snd_check is called for transmitting ACK if necessary. In this way, packet processing is completed. If there is not enough free space, a slow path is executed. The tcp_data_queue function newly allocates the buffer space and adds the data packet to the socket buffer. At this time, the receive socket buffer size is automatically increased if possible. Different from the quick path, tcp_data_snd_check is called to transmit a new data packet if possible. Finally, tcp_ack_snd_check is called to create and transmit the ACK packet if necessary. The amount of code executed by the two paths is not much. This is accomplished by optimizing the common case. In other words, it means that the uncommon case will be processed significantly more slowly. The out-of-order delivery is one of the uncommon cases. How to Communicate between Driver and NIC Communication between a driver and the NIC is the bottom of the stack and most people do not care about it. However, the NIC is executing more and more tasks to solve the performance issue. Understanding the basic operation scheme will help you understand the additional technology. A driver and the NIC asynchronously communicate. First, a driver requests packet transmission (call) and the CPU performs another task without waiting for the response. And then the NIC sends packets and notifies the CPU of that, the driver returns the received packets (returns the result). Like packet transmission, packet receiving is asynchronous. First, a driver requests packet receiving and the CPU performs another task (call). Then, the NIC receives packets and notifies the CPU of that, and the driver processes the received packets received (returns the result). Therefore, a space to save the request and the response is necessary. In most cases, the NIC uses the ring structure. The ring is similar to the common queue structure. With the fixed number of entries, one entry saves one request data or one response data. The entries are sequentially used in turn. The name "ring" is generally used since the fixed entries are reused in turn. As following the packet transmission procedure shown in the following Figure 8, you will see how the ring is used. Figure 8: Driver-NIC Communication: How to Transmit Packet. The driver receives packets from the upper layer and creates the send descriptor that the NIC can understand. The send descriptor includes the packet size and the memory address by default. As the NIC needs the physical address to access the memory, the driver should change the virtual address of the packets to the physical address. Then, it adds the send descriptor to the TX ring (1). The TX ring is the send descriptor ring. Next, it notifies the NIC of the new request (2). The driver directly writes the data to a specific NIC memory address. In this way, Programmed I/O (PIO) is the data transmission method in which the CPU directly sends data to the device. The notified NIC gets the send descriptor of the TX ring from the host memory (3). Since the device directly accesses the memory without intervention of the CPU, the access is called Direct Memory Access (DMA). After getting the send descriptor, the NIC determines the packet address and the size and then gets the actual packets from the host memory (4). With the checksum offload, the NIC computes the checksum when the NIC gets the packet data from the memory. Therefore, overhead rarely occurs. The NIC sends packets (5) and then writes the number of packets that are sent to the host memory (6). Then, it sends an interrupt (7). The driver reads the number of packets that are sent and then returns the packets that have been sent so far. In the following Figure 9, you will see the procedure of receiving packets. Figure 9: Driver-NIC Communication: How to Receive Packets. First, the driver allocates the host memory buffer for receiving packets and then creates the receive descriptor. The receive descriptor includes the buffer size and the memory address by default. Like the send descriptor, it saves the physical address that the DMA uses in the receive descriptor. Then, it adds the receive descriptor to the RX ring (1). It is the receive request and the RX ring is the receive request ring. Through the PIO, the driver notifies that there is a new descriptor in the NIC (2). The NIC gets the new descriptor of the RX ring. And then it saves the size and location of the buffer included in the descriptor to the NIC memory (3). After the packets have been received (4), the NIC sends the packets to the host memory buffer (5). If the checksum offload function is existing, the NIC computes the checksum at this time. The actual size of received packets, the checksum result, and any other information are saved in the separate ring (the receive return ring) (6). The receive return ring saves the result of processing the receive request, i.e., the response. And then the NIC sends an interrupt (7). The driver gets packet information from the receive return ring and processes the received packets. If necessary, it allocates new memory buffer and repeats Step (1) and Step (2). To tune the stack, most people say that the ring and interrupt setting should be adjusted. When the TX ring is large, a lot of send requests can be made at once. When the RX ring is large, a lot of packet receives can be done at once. A large ring is useful for the workload that has a huge burst of packet transmission/receiving. In most cases, the NIC uses a timer to reduce the number of interrupts since the CPU may suffer from large overhead to process interrupts. To avoid flooding the host system with too many interrupts, interrupts are collected and sent regularly(interrupt coalescing) while sending and receiving the packets. Stack Buffer and Flow Control Flow control is executed in several stages in the stack. Figure 10 shows buffers used to transmit data. First, an application creates data and adds it to the send socket buffer. If there is no free space in the buffer, the system call is failed or the blocking occurs in the application thread. Therefore, the application data rate flowing into the kernel must be controlled by using the socket buffer size limit. Figure 10: Buffers Related to Packet Transmission. The TCP creates and sends packets to the driver through the transmit queue (qdisc). It is a typical FIFO queue type and the maximum length of the queue is the value of txqueuelen which can be checked by executing the ifconfig command. Generally, it is thousands of packets. The TX ring is between the driver and the NIC. As mentioned before, it is considered as a transmission request queue. If there is no free space in the queue, no transmission request is made and the packets are accumulated in the transmit queue. If too many packets are accumulated, packets are dropped. The NIC saves the packets to transmit in the internal buffer. The packet rate from this buffer is affected by the physical rate (ex: 1 Gb/s NIC cannot offer performance of 10 Gb/s). And with the Ethernet flow control, packet transmission is stopped if there is no free space in the receive NIC buffer. When the packet rate from the kernel is faster than the packet rate from the NIC, packets are accumulated in the buffer of the NIC. If there is no free space in the buffer, processing of transmission request from the TX ring is stopped. More and more requests are accumulated in the TX ring and finally there is no free space in the queue. The driver cannot make any transmission request and the packets are accumulated in the transmit queue. Like this, backpressure is sent from the bottom to the top through many buffers. Figure 11 shows the buffers that the receive packets are passing. The packets are saved in the receive buffer of the NIC. From the view of flow control, the RX ring between the driver and the NIC is considered as a packet buffer. The driver gets packets coming into the RX ring and then sends them to the upper layer. There is no buffer between the driver and the upper layer since the NIC driver that is used by the server system uses NAPI by default. Therefore, it can be considered as the upper layer directly gets packets from the RX ring. The payload data of packets is saved in the receive socket buffer. The application gets the data from the socket buffer later. Figure 11: Buffers Related to Packet Receiving. The driver that does not support NAPI saves packets in the backlog queue. Later, the NAPI handler gets packets. Therefore, the backlog queue can be considered as a buffer between the upper layer and the driver. If the packet processing rate of the kernel is slower than the packet flow rate into the NIC, the RX ring space is full. And the space of the buffer in the NIC is full, too. When the Ethernet flow control is used, the NIC sends a request to stop transmission to the transmission NIC or makes the packet drop. There is no packet drop due to lack of space in the receive socket buffer because the TCP supports end-to-end flow control. However, packet drop occurs due to lack of space in the socket buffer when the application rate is slow because the UDP does not support flow control. The sizes of the TX ring and the RX ring used by the driver in Figure 10 and Figure 11 are the sizes of the rings shown by the ethtool. For most workloads which regard throughput as important, it will be helpful to increase the ring size and the socket buffer size. Increasing the sizes reduces the possibility of failures caused by lack of space in the buffer while receiving and transmitting a lot of packets at a fast rate. Conclusion Initially, I planned to explain only the things that would be helpful for you to develop network programs, execute performance tests, and perform troubleshooting. In spite of my initial plan, the amount of description included in this document is not small. I hope this document will help you to develop network applications and monitor their performance. The TCP/IP protocol itself is very complicated and has many exceptions. However, you don't need to understand every line of TCP/IP-related code of the OS to understand performance and analyze the phenomena. Just understanding its context will be very helpful for you. With continuous advancement of system performance and implementation of the OS network stack, the latest server can offer 10-20 Gb/s TCP throughput without any problem. These days, there are too many technology types related to performance, such as TSO, LRO, RSS, GSO, GRO, UFO, XPS, IOAT, DDIO, and TOE, just like alphabet soup, to make us confused. In the next article, I will explain about the network stack from the performance perspective and discuss the problems and effects of this technology. By Hyeongyeop Kim, Senior Engineer at Performance Engineering Lab, NHN Corporation.
February 27, 2013
by Esen Sagynov
· 13,744 Views · 1 Like
article thumbnail
Using the Libjars Option with Hadoop
When working with MapReduce one of the challenges that is encountered early-on is determining how to make your third-part JAR’s available to the map and reduce tasks. One common approach is to create a fat jar, which is a JAR that contains your classes as well as your third-party classes (see this Cloudera blog post for more details). A more elegant solution is to take advantage of the libjars option in the hadoop jar command, also mentioned in the Cloudera post at a high level. Here I’ll go into detail on the three steps required to make this work. Add libjars to the options It can be confusing to know exactly where to put libjars when running the hadoop jar command. The following example shows the correct position of this option: $ export LIBJARS=/path/jar1,/path/jar2 $ hadoop jar my-example.jar com.example.MyTool -libjars ${LIBJARS} -mytoolopt value It’s worth noting in the above example that the JAR’s supplied as the value of the libjar option are comma-separated, and not separated by your O.S. path delimiter (which is how a Java classpath is delimited). You may think that you’re done, but often times this step alone may not be enough - read on for more details! Make sure your code is using GenericOptionsParser The Java class that’s being supplied to the hadoop jar command should use the GenericOptionsParser class to parse the options being supplied on the CLI. The easiest way to do that is demonstrated with the following code, which leverages the ToolRunner class to parse-out the options: public static void main(final String[] args) throws Exception { Configuration conf = new Configuration(); int res = ToolRunner.run(conf, new com.example.MyTool(), args); System.exit(res); } t is crucial that the configuration object being passed into the ToolRunner.run method is the same one that you’re using when setting-up your job. To guarantee this, your class should use the getConf() method defined in Configurable (and implemented in Configured) to access the configuration: public class SmallFilesMapReduce extends Configured implements Tool { public final int run(final String[] args) throws Exception { Job job = new Job(super.getConf()); ... job.waitForCompletion(true); return ...; } f you don’t leverage the Configuration object supplied to the ToolRunner.run method in your MapReduce driver code, then your job won’t be correctly configured and your third-party JAR’s won’t be copied to the Distributed Cache or loaded in the remote task JVM’s. It’s the ToolRunner.run method (actually it delegates the command parsing to GenericOptionsParser) which actually parses-out the libjars argument, and adds to the Configuration object a value for the tmpjarproperty. So a quick way to make sure that this step is working is to look at the job file for your MapReduce job (there’s a link when viewing the job details from the JobTracker), and make sure that the tmpjar configuration name exists with a value identical to the path that you specified in your command. You can also use the command-line to search for the libjars configuration in HDFS $ hadoop fs -cat /_logs/history/*.xml | grep tmpjars Use HADOOP_CLASSPATH to make your third-party JAR’s available on the client-side So far the first two steps tackled what you needed to do to to make your third-party JAR’s available to the remote map and reduce task JVM’s. But what hasn’t been covered so far is making these same JAR’s available to the client JVM, which is the JVM that’s created when you run the hadoop jar command. For this to happen, you should set the HADOOP_CLASSPATH environment variable to contain the O.S. path-delimited list of third-party JAR’s. Let’s extend the commands in the first step above with the addition of setting the HADOOP_CLASSPATH environment variable: $ export LIBJARS=/path/jar1,/path/jar2 $ export HADOOP_CLASSPATH=/path/jar1:/path/jar2 $ hadoop jar my-example.jar com.example.MyTool -libjars ${LIBJARS} -mytoolopt value Note that value for HADOOP_CLASSPATH uses a Unix path delimiter of :, so modify accordingly for your platform. And if you don’t like the copy-paste above you could modify that line to substitute the commas for semi-colons: $ export HADOOP_CLASSPATH=`echo ${LIBJARS} | sed s/,/:/g`
February 26, 2013
by Alex Holmes
· 22,531 Views
article thumbnail
Text Processing, Part 2: Oh, Inverted Index
This is the second part of my text processing series. In this blog, we'll look into how text documents can be stored in a form that can be easily retrieved by a query. I'll used the popular open source Apache Lucene index for illustration. There are two main processing flow in the system ... Document indexing: Given a document, add it into the index Document retrieval: Given a query, retrieve the most relevant documents from the index. The following diagram illustrate how this is done in Lucene. Index Structure Both documents and query is represented as a bag of words. In Apache Lucene, "Document" is the basic unit for storage and retrieval. A "Document" contains multiple "Fields" (also call zones). Each "Field" contains multiple "Terms" (equivalent to words). To control how the document will be indexed across its containing fields, a Field can be declared in multiple ways to specified whether it should be analyzed (a pre-processing step during index), indexed (participate in the index) or stored (in case it needs to be returned in query result). Keyword (Not analyzed, Indexed, Stored) Unindexed (Not analyzed, Not indexed, Stored) Unstored (Analyzed, Indexed, Not stored) Text (Analyzed, Indexed, Stored) The inverted index is a core data structure of the storage. It is organized as an inverted manner from terms to the list of documents (which contain the term). The list (known as posting list) is ordered by a global ordering (typically by document id). To enable faster retrieval, the list is not just a single list but a hierarchy of skip lists. For simplicity, we ignore the skip list in subsequent discussion. This data structure is illustration below based on Lucene's implementation. It is stored on disk as segment files which will be brought to memory during the processing. The above diagram only shows the inverted index. The whole index contain an additional forward index as follows. Document indexing Document in its raw form is extracted from a data adaptor. (this can be making an Web API to retrieve some text output, or crawl a web page, or receiving an HTTP document upload). This can be done in a batch or online manner. When the index processing start, it parses each raw document and analyze its text content. The typical steps includes ... Tokenize the document (breakdown into words) Lowercase each word (to make it non-case-sensitive, but need to be careful with names or abbreviations) Remove stop words (take out high frequency words like "the", "a", but need to careful with phrases) Stemming (normalize different form of the same word, e.g. reduce "run", "running", "ran" into "run") Synonym handling. This can be done in two ways. Either expand the term to include its synonyms (ie: if the term is "huge", add "gigantic" and "big"), or reduce the term to a normalized synonym (ie: if the term is "gigantic" or "huge", change it to "big") At this point, the document is composed with multiple terms. doc = [term1, term2 ...]. Optionally, terms can be further combined into n-grams. After that we count the term frequency of this document. For example, in a bi-gram expansion, the document will become ... doc1 -> {term1: 5, term2: 8, term3: 4, term1_2: 3, term2_3:1} We may also compute a "static score" based on some measure of quality of the document. After that, we insert the document into the posting list (if it exist, otherwise create a new posting list) for each terms (all n-grams), this will create the inverted list structure as shown in previous diagram. There is a boost factor that can be set to the document or field. The boosting factor effectively multiply the term frequency which effectively affecting the importance of the document or field. Document can be added to the index in one of the following ways; inserted, modified and deleted. Typically the document will first added to the memory buffer, which is organized as an inverted index in RAM. When this is a document insertion, it goes through the normal indexing process (as I described above) to analyze the document and build an inverted list in RAM. When this is a document deletion (the client request only contains the doc id), it fetches the forward index to extract the document content, then goes through the normal indexing process to analyze the document and build the inverted list. But in this case the doc object in the inverted list is labeled as "deleted". When this is a document update (the client request contains the modified document), it is handled as a deletion followed by an insertion, which means the system first fetch the old document from the forward index to build an inverted list with nodes marked "deleted", and then build a new inverted list from the modified document. (e.g. If doc1 = "A B" is update to "A C", then the posting list will be {A:doc1(deleted) -> doc1, B:doc1(deleted), C:doc1}. After collapsing A, the posting list will be {A:doc1, B:doc1(deleted), C:doc1} As more and more document are inserted into the memory buffer, it will become full and will be flushed to a segment file on disk. In the background, when M segments files have been accumulated, Lucene merges them into bigger segment files. Notice that the size of segment files at each level is exponentially increased (M, M^2, M^3). This maintains the number of segment files that need to be search per query to be at the O(logN) complexity where N is the number of documents in the index. Lucene also provide an explicit "optimize" call that merges all the segment files into one. Here lets detail a bit on the merging process, since the posting list is already vertically ordered by terms and horizontally ordered by doc id, merging two segment files S1, S2 is basically as follows Walk the posting list from both S1 and S2 together in sorted term order. For those non-common terms (term that appears in one of S1 or S2 but not both), write out the posting list to a new segment S3. Until we find a common term T, we merge the corresponding posting list from these 2 segments. Since both list are sorted by doc id, we just walk down both posting list to write out the doc object to a new posting list. When both posting lists have the same doc (which is the case when the document is updated or deleted), we pick the latest doc based on time order. Finally, the doc frequency of each posting list (of the corresponding term) will be computed. Document retrieval Consider a document is a vector (each term as the separated dimension and the corresponding value is the tf-idf value) and the query is also a vector. The document retrieval problem can be defined as finding the top-k most similar document that match a query, where similarity is defined as the dot-product or cosine distance between the document vector and the query vector. tf-idf is a normalized frequency. TF (term frequency) represents how many time the term appears in the document (usually a compression function such as square root or logarithm is applied). IDF is the inverse of document frequency which is used to discount the significance if that term appears in many other documents. There are many variants of TF-IDF but generally it reflects the strength of association of the document (or query) with each term. Given a query Q containing terms [t1, t2], here is how we fetch the corresponding documents. A common approach is the "document at a time approach" where we traverse the posting list of t1, t2 concurrently (as opposed to the "term at a time" approach where we traverse the whole posting list of t1 before we start the posting list of t2). The traversal process is described as follows ... For each term t1, t2 in query, we identify all the corresponding posting lists. We walk each posting list concurrently to return a sequence of documents (ordered by doc id). Notice that each return document contains at least one term but can also also contain multiple terms. We compute the dynamic score which is dot product of the query to document vector. Notice that we typically don't concern the TF/IDF of the query (which is short and we don't care the frequency of each term). Therefore we can just compute the sum up all the TF score of the posting list that has a match term after dividing the IDF score (at the head of each posting list). Lucene also support query level boosting where a boost factor can be attached to the query terms. The boost factor will multiply the term frequency correspondingly. We also look up the static score which is purely based on the document (but not the query). The total score is a linear combination of static and dynamic score. Although the score we used in above calculation is based on computing the cosine distance between the query and document, we are not restricted to that. We can plug in any similarity function that make sense to the domain. (e.g. we can use machine learning to train a model to score the similarity between a query and a document). After we compute a total score, we insert the document into a heap data structure where the topK scored document is maintained. Here the whole posting list will be traversed. In case of the posting list is very long, the response time latency will be long. Is there a way that we don't have to traverse the whole list and still be able to find the approximate top K documents ? There are a couple strategies we can consider. Static Score Posting Order: Notice that the posting list is sorted based on a global order, this global ordering provide a monotonic increasing document id during the traversal that is important to support the "document at a time" traversal because it is impossible to visit the same document again. This global ordering, however, can be quite arbitrary and doesn't have to be the document id. So we can pick the order to be based on the static score (e.g. quality indicator of the document) which is global. The idea is that we traverse the posting list in decreasing magnitude of static score, so we are more likely to visit the document with the higher total score (static + dynamic score). Cut frequent terms: We do not traverse the posting list whose term has a low IDF value (ie: the term appears in many documents and therefore the posting list tends to be long). This way we avoid to traverse the long posting list. TopR list: For each posting list, we create an extra posting list which contains the top R documents who has the highest TF (term frequency) in the original list. When we perform the search, we perform our search in this topR list instead of the original posting list. Since we have multiple inverted index (in memory buffer as well as the segment files at different levels), we need to combine the result them. If termX appears in both segmentA and segmentB, then the fresher version will be picked. The fresher version is determine as follows; the segment with a lower level (smaller size) will be considered more fresh. If the two segment files are at the same level, then the one with a higher number is more fresh. On the other hand, the IDF value will be the sum of the corresponding IDF of each posting list in the segment file (the value will be slightly off if the same document has been updated, but such discrepancy is negligible). However, the processing of consolidating multiple segment files incur processing overhead in document retrieval. Lucene provide an explicit "optimize" call to merge all segment files into one single file so there is no need to look at multiple segment files during document retrieval. Distributed Index For large corpus (like the web documents), the index is typically distributed across multiple machines. There are two models of distribution: Term partitioning and Document partitioning. In document partitioning, documents are randomly spread across different partitions where the index is built. In term partitioning, the terms are spread across different partitions. We'll discuss document partitioning as it is more commonly used. Distributed index is provider by other technologies that is built on Lucene, such as ElasticSearch. A typical setting is as follows ... In this setting, machines are organized as columns and rows. Each column represent a partition of documents while each row represent a replica of the whole corpus. During the document indexing, first a row of the machines is randomly selected and will be allocated for building the index. When a new document crawled, a column machine from the selected row is randomly picked to host the document. The document will be sent to this machine where the index is build. The updated index will be later propagated to the other rows of replicas. During the document retrieval, first a row of replica machines is selected. The client query will then be broadcast to every column machine of the selected row. Each machine will perform the search in its local index and return the TopM elements to the query processor which will consolidate the results before sending back to client. Notice that K/P < M < K, where K is the TopK documents the client expects and P is the number of columns of machines. Notice that M is a parameter that need to be tuned. One caveat of this distributed index is that as the posting list is split horizontally across partitions, we lost the global view of the IDF value without which the machine is unable to calculate the TF-IDF score. There are two ways to mitigate that ... Do nothing: here we assume the document are evenly spread across different partitions so the local IDF represents a good ratio of the actual IDF. Extra round trip: In the first round, query is broadcasted to every column which returns its local IDF. The query processor will collected all IDF response and compute the sum of the IDF. In the second round, it broadcast the query along with the IDF sum to each column of machines, which will compute the local score based on the IDF sum.
February 26, 2013
by Ricky Ho
· 9,285 Views
article thumbnail
Solving RPM installation conflicts
This post comes from Ignacio Nin at the MySQL Performance Blog. Lately we’ve had many reports of the RPM packages for CentOS 5 (mostly) and CentOS 6 having issues when installing different combinations of our products, particularly with Percona Toolkit. Examples of bugs related to these issues are lp:1031427 and lp:1051874. These problems arise when trying to install a package from the distribution that is linked against the version of libmysqlclient.so shipped by the distribution (libmysqlclient.so.15 for CentOS 5/libmysqlclient.so.16 for CentOS 6) and a version of Percona Server that depends on another version of libmysqlclient.so, usually more recent. Bug lp:1031427 is an example of this, and shows how the packages would conflict when trying to install libmysqlclient.so. For example, when installing php-mysql alongside PS 5.5 in CentOS 6: # yum -q install Percona-Server-server-55 php-mysql Installing: Percona-Server-server-55 x86_64 5.5.29-rel29.4.401.rhel6 percona 15 M php-mysql x86_64 5.3.3-14.el6_3 updates 79 k Installing for dependencies: Percona-Server-client-55 x86_64 5.5.29-rel29.4.401.rhel6 percona 7.0 M Percona-Server-shared-51 x86_64 5.1.67-rel14.3.506.rhel6 percona 2.8 M Percona-Server-shared-55 x86_64 5.5.29-rel29.4.401.rhel6 percona 787 k Transaction Summary ===================================================================================================================================================== Install 5 Package(s) Is this ok [y/N]: y Transaction Check Error: file /usr/lib64/libmysqlclient.so conflicts between attempted installs of Percona-Server-shared-51-5.1.67-rel14.3.506.rhel6.x86_64 and Percona-Server-shared-55-5.5.29-rel29.4.401.rhel6.x86_64 file /usr/lib64/libmysqlclient_r.so conflicts between attempted installs of Percona-Server-shared-51-5.1.67-rel14.3.506.rhel6.x86_64 and Percona-Server-shared-55-5.5.29-rel29.4.401.rhel6.x86_64 The traditional solution for this situation was to provide a special package, Percona-Server-shared-compat (modeled after upstream’s MySQL-shared-compat) which would contain ALL versions of libmysqlclient.so.* together and wouldn’t conflict. Probably some of you are familiar with this approach. # yum -q install Percona-Server-server-55 Percona-Server-shared-compat php-mysql Installing: Percona-Server-server-55 x86_64 5.5.29-rel29.4.401.rhel6 percona 15 M Percona-Server-shared-compat x86_64 5.5.29-rel29.4.401.rhel6 percona 3.4 M php-mysql x86_64 5.3.3-14.el6_3 updates 79 k Installing for dependencies: Percona-Server-client-55 x86_64 5.5.29-rel29.4.401.rhel6 percona 7.0 M Percona-Server-shared-55 x86_64 5.5.29-rel29.4.401.rhel6 percona 787 k Transaction Summary ===================================================================================================================================================== Install 5 Package(s) Notice how PS-shared-compat installs along the -shared package, providing the older libmysqlclient.so.16 required by php-mysql. However, this has proved non-intuitive and problematic, since the shared-compat package wouldn’t get selected unless explicitely installed — and many of our users would rather have it “just work” without requiring additional knowledge of what the particular workaround was, etc.. We’re now trying a solution in which our -shared packages won’t conflict anymore at libmysqlclient.so, so we are able to install them side-by-side, modelled after the mysql-libs packages provided by CentOS/Redhat. So even if the user wants to install PS 5.5 alongside packages that depend on 5.1/5.0, the -shared packages will work together. For example installing 5.5 and postfix in CentOS: # yum -q install Percona-Server-server-55 postfix Installing: Percona-Server-server-55 x86_64 5.5.29-rel29.4.402.rhel5 percona-testing 19 M postfix x86_64 2:2.3.3-6.el5 base 3.8 M Installing for dependencies: Percona-SQL-shared-50 x86_64 5.0.92-b23.89.rhel5 percona-testing 1.8 M Percona-Server-client-55 x86_64 5.5.29-rel29.4.402.rhel5 percona-testing 9.1 M Percona-Server-shared-55 x86_64 5.5.29-rel29.4.402.rhel5 percona-testing 993 k … and this will install without problems. Additionally, this has the advantage of allowing an upgrade from 5.1 to 5.5 without uninstalling any software that depended on the old version. # rpm -qa | grep ^Percona Percona-Server-client-51-5.1.67-rel14.3.507.rhel6.x86_64 Percona-Server-shared-51-5.1.67-rel14.3.507.rhel6.x86_64 Percona-Server-server-51-5.1.67-rel14.3.507.rhel6.x86_64 In this case only Percona-Server-client-51 and Percona-Server-server-51 need be removed, allowing any package that depends on Percona-Server-shared-51 (providing libmysqlclient.so.16) to remain installed. After the server and client packages are uninstalled, you can install PS 5.5 without conflict. The current package candidates for versions 5.0.92 (which required an update), 5.1.67-14.3 and 5.5.29-29.4 can be tested from the percona-testing repository. We encourage you to try these out and send us your feedback and/or file any bugs you find. Installation instructions for Percona Testing repositories. We’re aiming to include these fixes in our next releases of 5.1 and 5.5. Percona Toolkit users in particular will enjoy this update since it’ll mean no more trouble when installing it from repository!
February 25, 2013
by Peter Zaitsev
· 7,814 Views
article thumbnail
Building SOLID Databases: Dependency Inversion and Robust DB Interfaces
Dependency inversion is the idea that interfaces should depend on abstractions not on specifics. According to Wikipedia, the principle states: A. High-level modules should not depend on low-level modules. Both should depend on abstractions. B. Abstractions should not depend upon details. Details should depend upon abstractions. Of course the second part of this principle is impossible if read literally. You can't have an abstraction until you know what details are to be covered, and so the abstraction and details are both co-dependent. If the covered details change sufficiently the abstraction will become either leaky or inadequate and so it is worth seeing these as intertwined to some extent. The focus on abstraction is helpful because it suggests that the interface contract should be designed in such a way that neither side really has to understand any internal details of the other in order to make things work. Both sides depend on well-encapsulated API's and neither side has to worry about what the other side is really doing. This is what is meant by details depending on abstractions rather than the other way around. This concept is quite applicable beyond object oriented programming because it covers a very basic aspect of API contract design, namely how well an API should encapsulate behavior. This principle is first formulated in its current form in the object oriented programming paradigm but is generally applicable elsewhere. SQL as an Abstraction Layer, or Why RDBMS are Still King There are plenty of reasons to dislike SQL, such as the fact that nulls are semantically ambiguous. As a basic disclaimer I am not holding SQL up to be a paragon of programming languages or even db interfaces, but I think it is important to discuss what SQL does right in this regard. SQL is generally understood to be a declarative language which approximates relational mathematics for database access purposes. With SQL, you specify what you want returned, not how to get it, and the planner determines the best way to get it. SQL is thus an interface language rather than a programming language per se. With SQL, you can worry about the logical structure, leaving the implementation details to the db engine. SQL queries are basically very high level specifications of operations, not detailed descriptions of how to do something efficiently. Even update and insert statements (which are by nature more imperative than select statements) leave the underlying implementation entirely to the database management system. I think that this, along with many concessions the language has made to real-world requirements (such as bags instead of sets and the addition of ordering to bags) largely account for the success of this language. SQL, in essence, encapsulates a database behind a mature mathematical, declarative model in the same way that JSON and REST do (in a much less comprehensive way) in many NoSQL db's. In essence SQL provides encapsulation, interface, and abstraction in a very full-featured way and this is why it has been so successful. SQL Abstraction as Imperfect One obvious problem with treating SQL as an abstraction layer in its own right is that one is frequently unable to write details in a way that is clearly separate from the interface. Often storage tables are hit directly, and therefore there is little separation between logical detail and logical interface, and so this can break down when database complexity reaches a certain size. Approaches to managing this problem include using stored procedures or user defined functions, and using views to encapsulate storage tables. Stored Procedures and User Defined Functions Done Wrong Of the above methods, stored procedures and functional interfaces have bad reputations frequently because of bad experiences that many people have with them. These include developers pushing too much logic into stored procedures, and the fact that defining functional interfaces in this way usually produces a very tight binding between database code and application code, often leading to maintainability problems. The first case is quite obvious, and includes the all-too-frequent case of trying to send emails directly from stored procedures (always a bad idea). This mistake leads to certain types of problems, including the fact that ACID-compliant operations may be mixed with non-ACID-compliant ones, leading to cases where a transaction can only be partially rolled back. Oops, we didn't actually record the order as shipped, but we told the customer it was..... MySQL users will also note this is an argument against mixing transactional and nontransactional backend table types in the same db..... However that problem is outside the scope of this post. Additionally, MySQL is not well suited for many applications against a single set of db relations. The second problem, though, is more insidious. The traditional way stored procedures and user defined functions are typically used, the application has to be deeply aware of the interface to the database, but the rollout for these aspects is different leading to the possibility or service interruptions, and a need to very carefully and closely time rollout of db changes with application changes. As more applications use the database, this becomes harder and the chance of something being overlooked becomes greater. For this reason the idea that all operations must go through a set of stored procedures is a decision fraught with hazard as the database and application environment evolves. Typically it is easier to manage backwards-compatibility in schemas than it is in functions and so a key question is how many opportunities you have to create new bugs when a new column is added. There are, of course, more hazards which I have dealt with before, but the point is that stored procedures are potentially harmful and a major part of the reason is that they usually form a fairly brittle contract with the application layer. In a traditional stored procedure, adding a column to be stored will require changing the number of variables in the stored procedure's argument list, the queries to access it, and each application's call to that stored procedure. In this way, they provide (in the absence of other help) at best a leaky abstraction layer around the database details. This is the sort of problem that dependency inversion helps to avoid. Stored Procedures and User Defined Functions Done Right Not all stored procedures are done wrong. In the LedgerSMB project we have at least partially solved the abstraction/brittleness issue by looking to web services for inspiration. Our approach provides an additional mapping layer and dynamic query generation around a stored procedure interface. By using a service locator pattern, and overloading the system tables in PostgreSQL as the service registry, we solve the problem of brittleness. Our approach of course is not perfect and it is not the only possibility. One shortcoming is that our approach is that the invocation of the service locator is relatively spartan. We intend to allow more options there in the future. However one thing I have noticed is the fact that there are far fewer places where bugs can hide and therefore faster and more robust development takes place. Additionally a focus on clarity of code in stored procedures has eliminated a number of important performance bottlenecks, and it limits the number of places where a given change propagates to. Other Important Options in PostgreSQL Stored procedures are not the only abstraction mechanisms available from PostgreSQL. In addition to views, there are also other interesting ways of using functions to accomplish this without insisting that all access goes through stored procedures. In addition these methods can be freely mixed to produce very powerful, intelligent database systems. Such options include custom types, written in C, along with custom operators, functions and the like. These would then be stored in columns and SQL can be used to provide an abstraction layer around the types. In this way SQL becomes the abstraction and the C programs become the details. A future post will cover the use of ip4r in network management with PostgreSQL db's as an example of what can be done here. Additionally, things like triggers and notifications can be used to ensure that appropriate changes trigger other changes in the same transaction or, upon transaction commit, hand off control to other programs in subsequent transactions (allowing for independent processing and error control for things like sending emails). Recommendations Rather than specific recommendations, the overall point here is to look at the database itself as a an application running in an application server (the RDBMS) and design it as an application with an appropriate API. There are many ways to do this, from writing components in C and using SQL as an abstraction mechanism to writing things in SQL and using stored procedures as a mechanism. One could even write code in SQL and still use SQL as an abstraction mechanism. The key point however is to be aware of the need for discoverable abstraction, a need which to date things like ORMs and stored procedures often fill very imperfectly. A well designed db with appropriate abstraction in interfaces, should be able to be seen as an application in its own right, engineered as such, and capable of serving multiple client apps through a robust and discoverable API. As with all things, it starts by recognizing the problems and putting solutions as priorities from the design stage onward.
February 19, 2013
by Chris Travers
· 5,240 Views
article thumbnail
Neo4j/Cypher: SQL Style GROUP BY Functionality
As I mentioned in a previous post I’ve been playing around with some football related data over the last few days and one query I ran (using cypher) was to find all the players who’ve been sent off this season in the Premiership. The model in the graph around sending offs looks like this: My initial query looked like this: START player = node:players('name:*') MATCH player-[:sent_off_in]-game-[:in_month]-month RETURN player.name, month.name First we get the names of all the players which are stored in an index and then we follow relationships to the games they were sent off in and then find which months those games were played in. That query returns: +----------------------------+ | player.name | month.name | +----------------------------+ | "Jenkinson" | "February" | | "Chico" | "September" | | "Odemwingie" | "September" | | "Agger" | "August" | | "Cole" | "December" | | "Whitehead" | "August" | ... +----------------------------+ I thought it’d be interesting to see how many sending offs there were in each month which we’d achieve in SQL by making use of a GROUP BY. cypher has a bunch of aggregation functions which allow us to achieve the same outcome. In our case we want to use the COUNT function and we want our grouping key to be the month of the year so we need to include that as part of our RETURN statement as well: START player = node:players('name:*') MATCH player-[:sent_off_in]-game-[:in_month]-month RETURN COUNT(player.name) AS numberOfReds, month.name ORDER BY numberOfReds DESC which returns: +----------------------------+ | numberOfReds | month.name | +----------------------------+ | 7 | "October" | | 6 | "December" | | 4 | "September" | | 4 | "November" | | 3 | "August" | | 2 | "January" | | 2 | "February" | +----------------------------+ As far as I can tell anything which isn’t an aggregate function is used as part of the grouping key which means we could include more than one field in our grouping key. This isn’t particularly relevant for us for this particular query but would become useful if we add the teams that the players play for. I extended the graph to included a player’s statistics for each game which also includes a relationship indicating which team they played for in a specific game. The model now looks like this: It does now look quite a bit more complicated but this was the best way I could think of modelling player specific details for a match. I couldn’t see another way of modelling the fact that a player played for a certain team in a match which I want to use for some other queries but if you can see a simpler way please let me know. To get a list of the red cards and the name of the team the offender played for we can write the following query: START player = node:players('name:*') MATCH player-[:sent_off_in]-game-[:in_month]-month, game-[:in_match]-stats-[:stats]-player, stats-[:played_for]-team RETURN player.name, month.name, team.name ORDER BY month.name The original query traversed a path from a player to games they were sent off in and then from the games to the month the game was played in. We’ve now added a traversal from the game to the game stats for that player and we also traverse from the game stats to the team node that the player played for in that game. When we run this we get the following results: +--------------------------------------------+ | player.name | month.name | team.name | +--------------------------------------------+ | "Agger" | "August" | "Liverpool" | | "Whitehead" | "August" | "Stoke" | ... | "Shotton" | "December" | "Stoke" | | "Nzonzi" | "December" | "Stoke" | | "Jenkinson" | "February" | "Arsenal" | ... | "Ivanovic" | "October" | "Chelsea" | | "Torres" | "October" | "Chelsea" | +--------------------------------------------+ So we can see that Stoke got 2 players sent off in December and Chelsea got 2 sent off in October. We can write the following query to return a result set which uses team and month as the grouping key i.e. we count how many paths there are which have the same team and month: START player = node:players('name:*') MATCH player-[:sent_off_in]-game-[:in_month]-month, game-[:in_match]-stats-[:stats]-player, stats-[:played_for]-team RETURN month.name, team.name, COUNT(player.name) AS numberOfReds ORDER BY numberOfReds DESC When we run that query we see the following results: +--------------------------------------------+ | month.name | team.name | numberOfReds | +--------------------------------------------+ | "December" | "Stoke" | 2 | | "October" | "Chelsea" | 2 | ... | "August" | "Stoke" | 1 | | "November" | "Tottenham" | 1 | | "December" | "Everton" | 1 | +--------------------------------------------+ This is all explained in more detail in the documentation but I thought it’d be interesting to write about it from the perspective of someone more used to writing SQL and trying to work out how to achieve the same thing in cypher.
February 19, 2013
by Mark Needham
· 27,290 Views
article thumbnail
XML->JSON->HashMap
Yes, it is long time since i posted… Was just trying to see how a XML can be converted to JSON and to HashMap. The situation is very imaginary. import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; import net.sf.json.JSON; import net.sf.json.xml.XMLSerializer; import org.apache.commons.io.IOUtils; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; public class XML2JSONConvertor { public static void main(String[] args) throws Exception { InputStream is = new FileInputStream(new File( “e:\\jagannathan\\personal\\java-projects\\secondtest.xml”)); String xml = IOUtils.toString(is); XMLSerializer xmlSerializer = new XMLSerializer(); JSON json = xmlSerializer.read(xml); System.out.println(json.toString(2)); printJSON(json.toString(2)); } public static void printJSON(String jsonString) { ObjectMapper mapper = new ObjectMapper(); try { Map jsonInMap = mapper.readValue(jsonString, new TypeReference>() { }); List keys = new ArrayList(jsonInMap.keySet()); for (String key : keys) { System.out.println(key + “: ” + jsonInMap.get(key)); } } catch (JsonGenerationException e) { e.printStackTrace(); } catch (JsonMappingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } Dependencies net.sf.json-lib json-lib 2.4 jdk15 commons-io commons-io 2.3 compile xom xom 1.2.5 org.codehaus.jackson jackson-mapper-asl 1.9.0 The Input XML Jags Inc Jagan Male 24-jul Satya Male 24-apr The output 7 Feb, 2013 7:20:50 PM net.sf.json.xml.XMLSerializer getType INFO: Using default type string { “name”: “Jags Inc”, “employees”: [ { "name": "Jagan", "sex": "Male", "dob": "24-jul" }, { "name": "Satya", "sex": "Male", "dob": "24-apr" } ] } name: Jags Inc employees: [{name=Jagan, sex=Male, dob=24-jul}, {name=Satya, sex=Male, dob=24-apr}]
February 18, 2013
by Jagannathan Asokan
· 33,522 Views
article thumbnail
Building an Online-Recommendation Engine with MongoDB
once upon a time there was a munich pizza baker who developed a technique to beam pizza out of bright sunshine. he can produce more than a thousand pizzas per second and needs a channel to sell this amount of pizza and decides to build an online shop. mario’s initial idea is to sell pizzas, but now he is thinking about introduction of new product lines like beverages, salads and pasta. before we take a look to the validation of mario´s idea, lets take a short look at the existing online shop. mario’s online shop is based on mongodb , apache wicket and spring . mongodb is a document-oriented nosql-database . mongodb stores records not in tables as a relational database but in bson documents, which is a binary version of json (java script object notation) and very similar to the object structure in mario’s application. the usage of mongodb makes his development easier and deployment faster. the figure shows a json document which is very similar to a java object: a json document property with the according value corresponds to the java object property with the appropriate value. you can add or remove properties in your java object and this will automatically change your database schema. so there is no need to put your java object model into a relational schema via hibernate. mario also decided to build his online shop only with open-source technologies like apache wicket and spring. wicket is a very common lightweight component-based web application framework and it is closely patterned after stateful gui frameworks such as javafx . the spring framework is an open source application framework and inversion of control container for the java platform and does not impose any specific programming model. spring has become popular in the java community as an alternative to, replacement for, or even addition to the enterprise javabean (ejb) model. because of this architecture mario is able to deploy its application in a lightweight application server like tomcat or jetty . this figure shows the system landscape of mario. mario has two major system on the lefthand site there is his online shop and on the righthand site there is ‘pas’ a famous billing system. in the middle is hadoop that connects both systems together. in the business world an application normally does not stand alone. in most cases an application must communicate with others. the lean architecture of marios online shop enables him to connect the billing system ‘pas’ to his online shop. spring for apache hadoop provides this integration between the two systems online shop and ‘pas’. hadoop supports data-intensive distributed applications and implements a computational paradigm named mapreduce, where the computation is divided into many small fragments, each of them may be executed or re-executed on any node in the cluster of commodity hardware. mario uses hadoop as an etl layer that enables him to transfer gigabytes of order information into the billing system. in this case hadoop makes it possible for a financial controller to verify if all orders were billed correctly. in addition to the online shop feature mario has a real-time sales dashboard that enables him to track his sales in real time. the dashboard displays daily and monthly sales statistics for each pizza and contains a map with the geographical overview of customer activity and competitor locations. here is a walkthrough of the shop : now lets talk about mario’s incredible new idea : mario wants to sell even more pizza! and other products as well. mario decides to use lean startup methods in order to test the possible introduction of new product lines and plans an experiment to validate his new idea using a scientific approach and pure facts instead of hunches. mario´s core assumption is that customers wants to buy other products than pizza – drinks, salads and pasta. furthermore he is worried about pricing. mario contacts all customers to complete a survey and provides an incentive for the participation, a free pizza to every customer who responds to the survey. the result of the survey validated mario’s assumption – customers want to buy beverages, salads and pasta. but he also found out that his customers are willing to pay higher prices for high-quality products and that they simply love his easy shopping flow. currently a pizza order can be completed with three clicks only, so there is new riskiest assumption to validate: will a more complex shopping flow affect his sales? the figures shows a validation board. a validation board is a deceptively simple tool for testing out product ideas. furthermore a validation board tracks pivots which follows from customer feedback. mario decides to introduce beverages, salads and pasta product lines and thinks about a possibility, how he can handle the extension of the product line without destroying the easy shopping flow. that’s why mario thinks a recommendation engine is the right way for him. panels for recommendations can be integrated in the online shop without changing the shopping flow. mario hired a statistician to help him implement a recommender system for his online shop for better cross-selling. he also defined new measurement points to validate his new idea . therefore he tracks the conversion rate of orders as well as cross-selling rates and every event in the online shop is already tracked in realtime. so mario can very easily perform further experiments in order to verify more assumptions. follow the blog to see how the story continues or come to mongodb usergroup meetup in munich , february 20, 2013 or mongodb days in berlin , february 26, 2013 to get a live presentation. our talk sheds light on how to build an online recommendation engine based on mongodb and apache mahout. we’ll show which recommenders must be built to reach mario’s goal and how these can be integrated in mario’s shop infrastructure.
February 17, 2013
by Comsysto Gmbh
· 8,462 Views
article thumbnail
Better explaining the CAP Theorem
today, i thought a lot about how to examine different databases. choosing a database is often a daunting task. there's a lot of confusion, a 'theorem', and more than all, the immortal proverb 'not one size fits all'. as if it helps. one of the first things that you realize, when examining nosql distributed databases (and how could you not)is that these days databases are like cars: they're all good. old fashioned sql databases can scale in and out, horizontally sharded over several machines to achieve high availability. nosql systems claim to be consistent. what difference then does it make what database would you choose? the availability and consistency that i mentioned comes, of course, from the misunderstood cap theorem , that - so people say - states that you can only choose 2 out of the 3 consistency: every read would get you the most recent write availability: every node (if not failed) always executes queries partition-tolerance: even if the connections between nodes are down, the other two (a & c) promises, are kept. usually its depicted in a nicely equilaterl triangle, as this one from ofirm : there's a nice proof and explanation of it in this 4 minute video here . but if we think about it, and also see some of brewer's (the theorem author) later remarks , we'll see that the 2 out of 3 is really 1 out of 2: it's really just a vs c! and this is simply because: availability is achieved by replicating the data across different machines consistency is achieved by updating several nodes before allowing further reads total partitioning, meaning failure of part of the system is rare. however, we could look at a delay, a latency, of the update between nodes, as a temporary partitioning . it will then cause a temporary decision between a and c: on systems that allow reads before updating all the nodes, we will get high availability on systems that lock all the nodes before allowing reads, we will get consistency that's it! and since this decision is temporary, it exists only for the duration of the delay, some may say that we are really contrasting latency (another word for availability) against consistency. by the way, there's no distributed system that wants to live with "paritioning" - if it does, it's not distributed. that is why putting sql in this triangle may lead to confusion.
February 17, 2013
by Lior Messinger
· 139,330 Views · 18 Likes
article thumbnail
CPU Cache Flushing Fallacy
Even from highly experienced technologists I often hear talk about how certain operations cause a CPU cache to "flush". This seems to be illustrating a very common fallacy about how CPU caches work, and how the cache sub-system interacts with the execution cores. In this article I will attempt to explain the function CPU caches fulfil, and how the cores, which execute our programs of instructions, interact with them. For a concrete example I will dive into one of the latest Intel x86 server CPUs. Other CPUs use similar techniques to achieve the same ends. Most modern systems that execute our programs are shared-memory multi-processor systems in design. A shared-memory system has a single memory resource that is accessed by 2 or more independent CPU cores. Latency to main memory is highly variable from 10s to 100s of nanoseconds. Within 100ns it is possible for a 3.0GHz CPU to process up to 1200 instructions. Each Sandy Bridge core is capable of retiring up to 4 instructions-per-cycle (IPC) in parallel. CPUs employ cache sub-systems to hide this latency and allow them to exercise their huge capacity to process instructions. Some of these caches are small, very fast, and local to each core; others are slower, larger, and shared across cores. Together with registers and main-memory, these caches make up our non-persistent memory hierarchy. Next time you are developing an important algorithm, try pondering that a cache-miss is a lost opportunity to have executed ~500 CPU instructions! This is for a single-socket system, on a multi-socket system you can effectively double the lost opportunity as memory requests cross socket interconnects. Memory Hierarchy Figure 1. For the circa 2012 Sandy Bridge E class servers our memory hierarchy can be decomposed as follows: Registers: Within each core are separate register files containing 160 entries for integers and 144 floating point numbers. These registers are accessible within a single cycle and constitute the fastest memory available to our execution cores. Compilers will allocate our local variables and function arguments to these registers. When hyperthreading is enabled these registers are shared between the co-located hyperthreads. Memory Ordering Buffers (MOB): The MOB is comprised of a 64-entry load and 36-entry store buffer. These buffers are used to track in-flight operations while waiting on the cache sub-system. The store buffer is a fully associative queue that can be searched for existing store operations, which have been queued when waiting on the L1 cache. These buffers enable our fast processors to run asynchronously while data is transferred to and from the cache sub-system. When the processor issues asynchronous reads and writes then the results can come back out-of-order. The MOB is used to disambiguate the ordering for compliance to the published memory model. Level 1 Cache: The L1 is a core-local cache split into separate 32K data and 32K instruction caches. Access time is 3 cycles and can be hidden as instructions are pipelined by the core for data already in the L1 cache. Level 2 Cache: The L2 cache is a core-local cache designed to buffer access between the L1 and the shared L3 cache. The L2 cache is 256K in size and acts as an effective queue of memory accesses between the L1 and L3. L2 contains both data and instructions. L2 access latency is 12 cycles. Level 3 Cache: The L3 cache is shared across all cores within a socket. The L3 is split into 2MB segments each connected to a ring-bus network on the socket. Each core is also connected to this ring-bus. Addresses are hashed to segments for greater throughput. Latency can be up to 38 cycles depending on cache size. Cache size can be up to 20MB depending on the number of segments, with each additional hop around the ring taking an additional cycle. The L3 cache is inclusive of all data in the L1 and L2 for each core on the same socket. This inclusiveness, at the cost of space, allows the L3 cache to intercept requests thus removing the burden from private core-local L1 & L2 caches. Main Memory: DRAM channels are connected to each socket with an average latency of ~65ns for socket local access on a full cache-miss. This is however extremely variable, being much less for subsequent accesses to columns in the same row buffer, through to significantly more when queuing effects and memory refresh cycles conflict. 4 memory channels are aggregated together on each socket for throughput, and to hide latency via pipelining on the independent memory channels. NUMA: In a multi-socket server we have non-uniform memory access. It is non-uniform because the required memory maybe on a remote socket having an additional 40ns hop across the QPI bus. Sandy Bridge is a major step forward for 2-socket systems over Westmere and Nehalem. With Sandy Bridge the QPI limit has been raised from 6.4GT/s to 8.0GT/s, and two lanes can be aggregated thus eliminating the bottleneck of the previous systems. For Nehalem and Westmere the QPI link is only capable of ~40% the bandwidth that could be delivered by the memory controller for an individual socket. This limitation made accessing remote memory a choke point. In addition, the QPI link can now forward pre-fetch requests which previous generations could not. Associativity Levels Caches are effectively hardware based hash tables. The hash function is usually a simple masking of some low-order bits for cache indexing. Hash tables need some means to handle a collision for the same slot. The associativity level is the number of slots, also known as ways or sets, which can be used to hold a hashed version of an address. Having more levels of associativity is a trade off between storing more data vs. power requirements and time to search each of the ways. For Sandy Bridge the L1 and L2 are 8-way and the L3 is 12-way associative. Cache Coherence With some caches being local to cores, we need a means of keeping them coherent so all cores can have a consistent view of memory. The cache sub-system is considered the "source of truth" for mainstream systems. If memory is fetched from the cache it is never stale; the cache is the master copy when data exists in both the cache and main-memory. This style of memory management is known as write-back whereby data in the cache is only written back to main-memory when the cache-line is evicted because a new line is taking its place. An x86 cache works on blocks of data that are 64-bytes in size, known as a cache-line. Other processors can use a different size for the cache-line. A larger cache-line size reduces effective latency at the expense of increased bandwidth requirements. To keep the caches coherent the cache controller tracks the state of each cache-line as being in one of a finite number of states. The protocol Intel employs for this is MESIF, AMD employs a variant know as MOESI. Under the MESIF protocol each cache-line can be in 1 of the 5 following states: Modified: Indicates the cache-line is dirty and must be written back to memory at a later stage. When written back to main-memory the state transitions to Exclusive. Exclusive: Indicates the cache-line is held exclusively and that it matches main-memory. When written to, the state then transitions to Modified. To achieve this state a Request-For-Ownership (RFO) message is sent which involves a read plus an invalidate broadcast to all other copies. Shared: Indicates a clean copy of a cache-line that matches main-memory. Invalid: Indicates an unused cache-line. Forward: Indicates a specialised version of the shared state i.e. this is the designated cache which should respond to other caches in a NUMA system. To transition from one state to another, a series of messages are sent between the caches to effect state changes. Previous to Nehalem for Intel, and Opteron for AMD, this cache coherence traffic between sockets had to share the memory bus which greatly limited scalability. These days the memory controller traffic is on a separate bus. The Intel QPI, and AMD HyperTransport, buses are used for cache coherence between sockets. The cache controller exists as a module within each L3 cache segment that is connected to the on-socket ring-bus network. Each core, L3 cache segment, QPI controller, memory controller, and integrated graphics sub-system are connected to this ring-bus. The ring is made up of 4 independent lanes for: request, snoop, acknowledge, and 32-bytes data per cycle. The L3 cache is inclusive in that any cache-line held in the L1 or L2 caches is also held in the L3. This provides for rapid identification of the core containing a modified line when snooping for changes. The cache controller for the L3 segment keeps track of which core could have a modified version of a cache-line it owns. If a core wants to read some memory, and it does not have it in a Shared, Exclusive, or Modified state; then it must make a read on the ring bus. It will then either be read from main-memory if not in the cache sub-systems, or read from L3 if clean, or snooped from another core if Modified. In any case the read will never return a stale copy from the cache sub-system, it is guaranteed to be coherent. Concurrent Programming If our caches are always coherent then why do we worry about visibility when writing concurrent programs? This is because within our cores, in their quest for ever greater performance, data modifications can appear out-of-order to other threads. There are 2 major reasons for this. Firstly, our compilers can generate programs that store variables in registers for relatively long periods of time for performance reasons, e.g. variables used repeatedly within a loop. If we need these variables to be visible across cores then the updates must not be register allocated. This is achieved in C by qualifying a variable as "volatile". Beware that C/C++ volatile is inadequate for telling the compiler to order other instructions. For this you need fences/barriers. The second major issue with ordering we have to be aware of is a thread could write a variable and then, if it reads it shortly after, could see the value in its store buffer which may be older than the latest value in the cache sub-system. This is never an issue for algorithms following the Single Writer Principle but is an issue for the likes of the Dekker and Peterson lock algorithms. To overcome this issue, and ensure the latest value is observed, the thread must wait for the store buffer to drain on that core. This can be achieved by issuing a fence instruction. The write of a volatile variable in Java, in addition to never being register allocated, is accompanied by a full fence instruction. This fence instruction on x86 has a significant performance impact by preventing progress on the issuing thread until the store buffer is drained. Fences on other processors can have more efficient implementations that simply put a marker in the store buffer for the search boundary, e.g. the Azul Vega does this. If you want to ensure memory ordering across Java threads when following the Single Writer Principle, and avoid the store fence, it is possible by using the j.u.c.Atomic(Int|Long|Reference).lazySet() method, as opposed to setting a volatile variable. The Fallacy Returning to the fallacy of "flushing the cache" as part of a concurrent algorithm. I think we can safely say that we never "flush" the CPU cache within our user space programs. I believe the source of this fallacy is the need to flush, mark or drain to a point, the store buffer for some classes of concurrent algorithms so the latest value can be observed on a subsequent load operation. For this we require a memory ordering fence and not a cache flush. Another possible source of this fallacy is that L1 caches, or the TLB, may need to be flushed based on address indexing policy on a context switch. ARM, previous to ARMv6, did not use address space tags on TLB entries thus requiring the whole L1 cache to be flushed on a context switch. Many processors require the L1 instruction cache to be flushed for similar reasons, in many cases this is simply because instruction caches are not required to be kept coherent. The bottom line is, context switching is expensive and a bit off topic, so in addition to the cache pollution of the L2, a context switch can also cause the TLB and/or L1 caches to require a flush. Intel x86 processors require only a TLB flush on context switch.
February 15, 2013
by Martin Thompson
· 11,503 Views · 3 Likes
article thumbnail
The Reality of a Developer's Life — In GIFs, of Course
Want to check out what life is like as a developer? Check out this post for a series of GIFs demonstrating the dev life.
February 15, 2013
by Alex Soto
· 500,540 Views · 17 Likes
article thumbnail
Using awk and Friends with Hadoop
imagine you have a csv file that you want to manipulate. here’s a sample file we can play with: lopez,charlie,2002,11,21 parker,ward,1995,04,08 henderson,russell,2007,10,01 our goal is to transform this into the following form by combining the last three columns: lopez,charlie,20021121 parker,ward,19950408 henderson,russell,20071001 in linux this would take all of two seconds (excuse the awkward awk command): shell$ awk -f"," '{ print $1","$2","$3$4$5 }' people.txt what if you wanted to quickly do the same in hdfs - and let’s assume you want to write the results back to hdfs. one approach would be to use the hdfs cli to stream the inputs into awk, and stream the awk output back into hdfs. you could do this with the hdfs cat and put - options (note that adding a hyphen after put instructs the put command to stream data from standard input to hdfs): shell$ hadoop fs -cat people.txt | awk -f"," '{ print $1","$2","$3$4$5 }' | hadoop fs -put - people-coalesed.txt btw, if your input and output files are lzop-compressed then this command would work: shell$ hadoop fs -cat people.txt.lzo | lzop -dc | awk -f"," '{ print $1","$2","$3$4$5 }' | \ lzop -c | hadoop fs -put - people-coalesed.txt.lzo this is great if your file isn’t too large, but if it’s multiple gigabytes in length then you probably want to harness the power of mapreduce to get this done in a jiffy! the words “in a jiffy” and “mapreduce” aren’t commonly used together, so what do we do? well you could crack open pig or hive and write some custom user-defined functions, but this means you end up in java which we want to avoid. hadoop streaming comes to the rescue in these situations. let’s first create our awk script which will be executed: shell$ cat people.awk #!/bin/awk -f begin { fs = "," } { print $1","$2","$3$4$5 } in linux, if you make this awk script executable, you could execute is as follows: shell$ ./people.awk people.txt in mapreduce-land we don’t need to join data in this particular example, so we don’t need to run any reducers. call your awk script from mappers via hadoop streaming with this command: shell$ hadoop_home=/usr/lib/hadoop shell$ ${hadoop_home}/bin/hadoop \ jar ${hadoop_home}/contrib/streaming/*.jar \ -d mapreduce.job.reduces=0 \ -d mapred.reduce.tasks=0 \ -input people.txt \ -output people-coalesed \ -mapper people.awk \ -file people.awk a few options in the hadoop streaming command are worth examining: finally - to get lzo into the picture you need to add -inputformat , -d mapred.output.compress and -d mapred.output.compression.codec arguments: shell$ hadoop_home=/usr/lib/hadoop shell$ ${hadoop_home}/bin/hadoop \ jar ${hadoop_home}/contrib/streaming/*.jar \ -d mapreduce.job.reduces=0 \ -d mapred.reduce.tasks=0 \ -d mapred.output.compress=true \ -d stream.map.input.ignorekey=true \ -d mapred.output.compression.codec=com.hadoop.compression.lzo.lzopcodec \ -inputformat com.hadoop.mapred.deprecatedlzotextinputformat \ -input people.txt.lzo \ -output people-coalesed \ -mapper people.awk \ -file people.awk
February 14, 2013
by Alex Holmes
· 13,147 Views · 1 Like
article thumbnail
Introduction to JCache JSR 107
Resin has supported caching, session replication (another form of caching), and http proxy caching in cluster environments for over ten years. When you use Resin caching, you are using the same platform that has the speed and scalability of custom services written in C like NginX with the usability of Java, and the industry platform Java EE. JCache JSR 107 is a distributed cache that has a similar interface to the HashMap that you know and love. To be more specific, the Cache object in JCache looks like a java.util.ConncurrentHashMap. In addition, JCache JSR 107 defines integration with CDI (as well as Spring and Guice). You can decorate services with interceptors that apply caching to the services just by defining annotations. Resin 4 has support for JCache, and JCache support is required for Java EE 7. Let's look at a small example to see how easy is to get started with JCache. package hello.world; import javax.cache.Cache; import javax.cache.CacheBuilder; import javax.cache.CacheManager; import javax.cache.Caching; ... @WebServlet("/HelloServlet") public class HelloServlet extends HttpServlet { Cache cache; public Cache cache() { if (cache == null) { //building a cache CacheManager manager = Caching.getCacheManager("cacheManagerHello"); CacheBuilder builder = manager.createCacheBuilder("a"); cache = builder.build(); } return cache; } protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setContentType("text/html"); response.getWriter().append(" "); String helloMessage = cache().get("hello message"); if (helloMessage == null) { helloMessage = new StringBuilder(20) .append("Hello World ! ") .append(System.currentTimeMillis()).toString(); cache().put("hello message", helloMessage); // <-------------- putting results in the cache } response.getWriter().append(helloMessage); response.getWriter().append(" "); } } The above works out fairly well, but what if we want to periodically change the helloMessage. Let's say we get 2,000 requests a second, but every 10 seconds or so we would like to regenerate the helloMessage. The message might be: Hello World ! 1358979745996 Later we would want it to change. If we wanted it to change every 10 seconds after it was last accessed, we would do this: cache = builder.setExpiry(ExpiryType.ACCESSED, new Duration(TimeUnit.SECONDS, 10)).build(); For this example, we want to change it every 10 seconds after is was last modified. We would set up the timeout on the creation as follows: cache = builder.setExpiry(ExpiryType.MODIFIED, new Duration(TimeUnit.SECONDS, 10)).build(); This would go right in the cache method we defined earlier. public Cache cache() { if (cache == null) { CacheManager manager = Caching.getCacheManager("cacheManagerHello"); CacheBuilder builder = manager.createCacheBuilder("b"); cache = builder.setExpiry(ExpiryType.MODIFIED, new Duration(TimeUnit.SECONDS, 10)).build(); } return cache; } Resin's JCache implementation is built on top Resin distributed cache architecture. You get replication, and data redundancy built in. Bill Digman is a Java EE / Servlet enthusiast and Open Source enthusiast who loves working with Caucho's Resin Servlet Container, a Java EE Web Profile Servlet Container. Caucho's Resin OpenSource Servlet Container Java EE Web Profile Servlet Container Caucho's Resin 4.0 JCache blog post
February 13, 2013
by Bill Digman
· 48,935 Views · 1 Like
article thumbnail
Synchronising Multithreaded Integration Tests
Testing threads is hard, very hard and this makes writing good integration tests for multithreaded systems under test... hard. This is because in JUnit there's no built in synchronisation between the test code, the object under test and any threads. This means that problems usually arise when you have to write a test for a method that creates and runs a thread. One of the most common scenarios in this domain is in making a call to a method under test, which starts a new thread running before returning. At some point in the future when the thread's job is done you need assert that everything went well. Examples of this scenario could include asynchronously reading data from a socket or carrying out a long and complex set of operations on a database. For example, the ThreadWrapper class below contains a single public method: doWork(). Calling doWork() sets the ball rolling and at some point in the future, at the discretion of the JVM, a thread runs adding data to a database. public class ThreadWrapper { /** * Start the thread running so that it does some work. */ public void doWork() { Thread thread = new Thread() { /** * Run method adding data to a fictitious database */ @Override public void run() { System.out.println("Start of the thread"); addDataToDB(); System.out.println("End of the thread method"); } private void addDataToDB() { // Dummy Code... try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } }; thread.start(); System.out.println("Off and running..."); } } A straightforward test for this code would be to call the doWork() method and then check the database for the result. The problem is that, owing to the use of a thread, there's no co-ordination between the object under test, the test and the thread. A common way of achieving some co-ordination when writing this kind of test is to put some kind of delay in between the call to the method under test and checking the results in the database as demonstrated below: public class ThreadWrapperTest { @Test public void testDoWork() throws InterruptedException { ThreadWrapper instance = new ThreadWrapper(); instance.doWork(); Thread.sleep(10000); boolean result = getResultFromDatabase(); assertTrue(result); } /** * Dummy database method - just return true */ private boolean getResultFromDatabase() { return true; } } In the code above there is a simple Thread.sleep(10000) between two method calls. This technique has the benefit of being incredabile simple; however it's also very risky. This is because it introduces a race condition between the test and the worker thread as the JVM makes no guarantees about when threads will run. Often it'll work on a developer's machine only to fail consistently on the build machine. Even if it does work on the build machine it atificially lengthens the duration of the test; remember that quick builds are important. The only sure way of getting this right is to synchronise the two different threads and one technique for doing this is to inject a simple CountDownLatch into the instance under test. In the example below I've modified the ThreadWrapper class's doWork() method adding the CountDownLatch as an argument. public class ThreadWrapper { /** * Start the thread running so that it does some work. */ public void doWork(final CountDownLatch latch) { Thread thread = new Thread() { /** * Run method adding data to a fictitious database */ @Override public void run() { System.out.println("Start of the thread"); addDataToDB(); System.out.println("End of the thread method"); countDown(); } private void addDataToDB() { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } private void countDown() { if (isNotNull(latch)) { latch.countDown(); } } private boolean isNotNull(Object obj) { return latch != null; } }; thread.start(); System.out.println("Off and running..."); } } he Javadoc API describes a count down latch as: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier. A CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown(). A CountDownLatchinitialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times. A useful property of a CountDownLatch is that it doesn't require that threads calling countDown wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past an await until all threads could pass. The idea here is that the test code will never check the database for the results until the run() method of the worker thread has called latch.countdown(). This is because the test code thread is blocking at the call to latch.await(). latch.countdown() decrements latch's count and once this is zero the blocking call the latch.await() returns and the test code continues executing, safe in the knowledge that any results which should be in the database, are in the database. The test can then retrieve these results and make a valid assertion. Obviously, the code above merely fakes the database connection and operations. The thing is you may not want to, or need to, inject a CountDownLatch directly into your code; after all it's not used in production and it doesn't look particularly clean or elegant. One quick way around this is to simply make the doWork(CountDownLatch latch) method package private and expose it through a public doWork() method. public class ThreadWrapper { /** * Start the thread running so that it does some work. */ public void doWork() { doWork(null); } @VisibleForTesting void doWork(final CountDownLatch latch) { Thread thread = new Thread() { /** * Run method adding data to a fictitious database */ @Override public void run() { System.out.println("Start of the thread"); addDataToDB(); System.out.println("End of the thread method"); countDown(); } private void addDataToDB() { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } private void countDown() { if (isNotNull(latch)) { latch.countDown(); } } private boolean isNotNull(Object obj) { return latch != null; } }; thread.start(); System.out.println("Off and running..."); } } The code above uses Google's Guava @VisibleForTesting annotation to tell us that the doWork(CountDownLatch latch) method visibility has been relaxed slightly for testing purposes. Now I realise that making a method call package private for testing purposes in highly controversial; some people hate the idea, whilst others include it everywhere. I could write a whole blog on this subject (and may do one day), but for me it should be used judiciously, when there's no other choice, for example when you're writing characterisation tests for legacy code. If possible it should be avoided, but never ruled out. After all tested code is better than untested code. With this in mind the next iteration of ThreadWrapper designs out the need for a method marked as @VisibleForTesting together with the need to inject a CountDownLatch into your production code. The idea here is to use the Strategy Pattern and separate the Runnable implementation from the Thread. Hence, we have a very simple ThreadWrapper public class ThreadWrapper { /** * Start the thread running so that it does some work. */ public void doWork(Runnable job) { Thread thread = new Thread(job); thread.start(); System.out.println("Off and running..."); } } and a separate job: public class DatabaseJob implements Runnable { /** * Run method adding data to a fictitious database */ @Override public void run() { System.out.println("Start of the thread"); addDataToDB(); System.out.println("End of the thread method"); } private void addDataToDB() { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } } You'll notice that the DatabaseJob class doesn't use a CountDownLatch. How is it synchronised? The answer lies in the test code below... public class ThreadWrapperTest { @Test public void testDoWork() throws InterruptedException { ThreadWrapper instance = new ThreadWrapper(); CountDownLatch latch = new CountDownLatch(1); DatabaseJobTester tester = new DatabaseJobTester(latch); instance.doWork(tester); latch.await(); boolean result = getResultFromDatabase(); assertTrue(result); } /** * Dummy database method - just return true */ private boolean getResultFromDatabase() { return true; } private class DatabaseJobTester extends DatabaseJob { private final CountDownLatch latch; public DatabaseJobTester(CountDownLatch latch) { super(); this.latch = latch; } @Override public void run() { super.run(); latch.countDown(); } } } The test code above contains an inner class DatabaseJobTester, which extends DatabaseJob. In this class the run() method has been overridden to include a call to latch.countDown() after our fake database has been updated via the call to super.run(). This works because the test passes a DatabaseJobTester instance to the doWork(Runnable job) method adding in the required thread testing capability. The idea of sub-classing objects under test is something I've mentioned before in one of my blogs on testing techniques and is a really powerful technique. So, to conclude: Testing threads is hard. Testing anonymous inner classes is almost impossible. Using Thead.sleep(...) is a risky idea and should be avoided. You can refactor out these problems using the Strategy Pattern. Programming is the Art of Making the Right Decision ...and that relaxing a method's visibility for testing may or may not be a good idea, but more on that later... The code above is available on Github in the captain debug repository (git://github.com/roghughe/captaindebug.git) under the unit-testing-threads project.
February 13, 2013
by Roger Hughes
· 13,946 Views · 12 Likes
article thumbnail
How to Return Dictionary As a Result From a LINQ Query in C#?
This article will provide a code snippet and explains how to return Dictionary as result from a LINQ Query in C#. There are times when you want to retrieve only the ID(distinct) and the name from the database table using LINQ . In scenarios like this , one can use the ToDictionary method to place the necessary properties to the dictionary and return them. Below is a sample sourecode demonstrating the usage of ToDictionary method in LINQ Query public class BlockbusterMovie { public string Name { get; set; } public int ID { get; set; } } public class BlockbusterMovies : List { public BlockbusterMovies() { Add(new BlockbusterMovie { Name = "Vishwaroopam", ID = 1 }); Add(new BlockbusterMovie { Name = "Endhiran", ID = 2 }); Add(new BlockbusterMovie { Name = "Thuppaki", ID = 3 }); Add(new BlockbusterMovie { Name = "Mankatha", ID = 4 }); } } The BlockbusterMovies class has the collection of movies which is used in the below code snippet to return the dictionary based on the ID and Name. private void Form1_Load(object sender, EventArgs e) { List movies = new BlockbusterMovies(); var LstMovies = movies.ToDictionary(Field => Field.ID, mc => mc.Name); }
February 13, 2013
by Senthil Kumar
· 53,358 Views
  • Previous
  • ...
  • 853
  • 854
  • 855
  • 856
  • 857
  • 858
  • 859
  • 860
  • 861
  • 862
  • ...
  • Next
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×