DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Curious about the future of data-driven systems? Join our Data Engineering roundtable and learn how to build scalable data platforms.

Data Engineering: The industry has come a long way from organizing unstructured data to adopting today's modern data pipelines. See how.

Threat Detection: Learn core practices for managing security risks and vulnerabilities in your organization — don't regret those threats!

Managing API integrations: Assess your use case and needs — plus learn patterns for the design, build, and maintenance of your integrations.

Avatar

Ricky Ho

Architect at Self

San Jose, US

Joined May 2008

About

I am a software architect working in service hosting area. I am interested and specialized in SaaS, Cloud computing and Parallel processing.

Stats

Reputation: 83
Pageviews: 1.3M
Articles: 16
Comments: 32
  • Articles
  • Comments

Articles

article thumbnail
Reinforcement Learning Overview
And no, we're not talking about Pavlov's dogs here. Learn about the reinforcement learning aspect of machine learning and the key algorithms that are involved!
August 29, 2017
· 6,902 Views · 4 Likes
article thumbnail
Big Data Processing in Spark
In the traditional 3-tier architecture, data processing is performed by the application server where the data itself is stored in the database server. Application server and database server are typically two different machine. Therefore, the processing cycle proceeds as follows Application server send a query to the database server to retrieve the necessary data Application server perform processing on the received data Application server will save the changed data to the database server In the traditional data processing paradigm, we move data to the code. It can be depicted as follows ... Then big data phenomenon arrives. Because the data volume is huge, it cannot be hold by a single database server. Big data is typically partitioned and stored across many physical DB server machines. On the other hand, application servers need to be added to increase the processing power of big data. However, as we increase the number of App servers and DB servers for storing and processing the big data, more data need to be transfer back and forth across the network during the processing cycle, up to a point where the network becomes a major bottleneck. Moving Code to Data To overcome the network bottleneck, we need a new computing paradigm. Instead of moving data to the code, we move the code to the data and perform the processing at where the data is stored. Notice the change of the program structure The program execution starts at a driver, which orchestrate the execution happening remotely across many worker servers within a cluster. Data is no longer transferred to the driver program, the driver program holds a data reference in its variable rather than the data itself. The data reference is basically an id to locate the corresponding data residing in the database server Code is shipped from the program to the database server, where the execution is happening, and data is modified at the database server without leaving the server machine. Finally the program request a save of the modified data. Since the modified data resides in the database server, no data transfer happens over the network. By moving the code to the data, the volume of data transfer over network is significantly reduced. This is an important paradigm shift for big data processing. In the following session, I will use Apache Spark to illustrate how this big data processing paradigm is implemented. RDD Resilient Distributed Dataset (RDD) is how Spark implements the data reference concept. RDD is a logical reference of a dataset which is partitioned across many server machines in the cluster. To make a clear distinction between data reference and data itself, a Spark program is organized as a sequence of execution steps, which can either be a "transformation" or an "action". Programming Model A typical program is organized as follows From an environment variable "context", create some initial data reference RDD objects Transform initial RDD objects to create more RDD objects. Transformation is expressed in terms of functional programming where a code block is shipped from the driver program to multiple remote worker server, which hold a partition of the RDD. Variable appears inside the code block can either be an item of the RDD or a local variable inside the driver program which get serialized over to the worker machine. After the code (and the copy of the serialized variables) is received by the remote worker server, it will be executed there by feeding the items of RDD residing in that partition. Notice that the result of a transformation is a brand new RDD (the original RDD is not mutated) Finally, the RDD object (the data reference) will need to be materialized. This is achieved through an "action", which will dump the RDD into a storage, or return its value data to the driver program. Here is a word count example # Get initial RDD from the context file = spark.textFile("hdfs://...") # Three consecutive transformation of the RDD counts = file.flatMap(lambda line: line.split(" ")) .map(lambda word: (word, 1)) .reduceByKey(lambda a, b: a + b) # Materialize the RDD using an action counts.saveAsTextFile("hdfs://...") When the driver program starts its execution, it builds up a graph where nodes are RDD and edges are transformation steps. However, no execution is happening at the cluster until an action is encountered. At that point, the driver program will ship the execution graph as well as the code block to the cluster, where every worker server will get a copy. The execution graph is a DAG. Each DAG is a atomic unit of execution. Each source node (no incoming edge) is an external data source or driver memory Each intermediate node is a RDD Each sink node (no outgoing edge) is an external data source or driver memory Green edge connecting to RDD represents a transformation. Red edge connecting to a sink node represents an action Data Shuffling Although we ship the code to worker server where the data processing happens, data movement cannot be completely eliminated. For example, if the processing requires data residing in different partitions to be grouped first, then we need to shuffle data among worker server. Spark carefully distinguish "transformation" operation in two types. "Narrow transformation" refers to the processing where the processing logic depends only on data that is already residing in the partition and data shuffling is unnecessary. Examples of narrow transformation includes filter(), sample(), map(), flatMap() .... etc. "Wide transformation" refers to the processing where the processing logic depends on data residing in multiple partitions and therefore data shuffling is needed to bring them together in one place. Example of wide transformation includes groupByKey(), reduceByKey() ... etc. Joining two RDD can also affect the amount of data being shuffled. Spark provides two ways to join data. In a shuffle join implementation, data of two RDD with the same key will be redistributed to the same partition. In other words, each of the items in each RDD will be shuffled across worker servers. Beside shuffle join, Spark provides another alternative call broadcast join. In this case, one of the RDD will be broadcasted and copied over to every partition. Imagine the situation when one of the RDD is significantly smaller relative to the other, then broadcast join will reduce the network traffic because only the small RDD need to be copied to all worker servers while the large RDD doesn't need to be shuffled at all. In some cases, transformation can be re-ordered to reduce the amount of data shuffling. Below is an example of a JOIN between two huge RDDs followed by a filtering. Plan1 is a naive implementation which follows the given order. It first join the two huge RDD and then apply the filter on the join result. This ends up causing a big data shuffling because the two RDD is huge, even though the result after filtering is small. Plan2 offers a smarter way by using the "push-down-predicate" technique where we first apply the filtering in both RDDs before joining them. Since the filtering will reduce the number of items of each RDD significantly, the join processing will be much cheaper. Execution Planning As explain above, data shuffling incur the most significant cost in the overall data processing flow. Spark provides a mechanism that generate an execute plan from the DAG that minimize the amount of data shuffling. Analyze the DAG to determine the order of transformation. Notice that we starts from the action (terminal node) and trace back to all dependent RDDs. To minimize data shuffling, we group the narrow transformation together in a "stage" where all transformation tasks can be performed within the partition and no data shuffling is needed. The transformations becomes tasks that are chained together within a stage Wide transformation sits at the boundary of two stages, which requires data to be shuffled to a different worker server. When a stage finishes its execution, it persist the data into different files (one per partition) of the local disks. Worker nodes of the subsequent stage will come to pickup these files and this is where data shuffling happens Below is an example how the execution planning turns the DAG into an execution plan involving stages and tasks. Reliability and Fault Resiliency Since the DAG defines a deterministic transformation steps between different partitions of data within each RDD RDD, fault recovery is very straightforward. Whenever a worker server crashes during the execution of a stage, another worker server can simply re-execute the stage from the beginning by pulling the input data from its parent stage that has the output data stored in local files. In case the result of the parent stage is not accessible (e.g. the worker server lost the file), the parent stage need to be re-executed as well. Imagine this is a lineage of transformation steps, and any failure of a step will trigger a restart of execution from its last step. Since the DAG itself is an atomic unit of execution, all the RDD values will be forgotten after the DAG finishes its execution. Therefore, after the driver program finishes an action (which execute a DAG to its completion), all the RDD value will be forgotten and if the program access the RDD again in subsequent statement, the RDD needs to be recomputed again from its dependents. To reduce this repetitive processing, Spark provide a caching mechanism to remember RDDs in worker server memory (or local disk). Once the execution planner finds the RDD is already cache in memory, it will use the RDD right away without tracing back to its parent RDDs. This way, we prune the DAG once we reach an RDD that is in the cache. Overall speaking, Apache Spark provides a powerful framework for big data processing. By the caching mechanism that holds previous computation result in memory, Spark out-performs Hadoop significantly because it doesn't need to persist all the data into disk for each round of parallel processing. Although it is still very new, I think Spark will take off as the main stream approach to process big data.
March 13, 2015
· 22,824 Views · 2 Likes
article thumbnail
Lambda Architecture Principles
"Lambda Architecture" (introduced by Nathan Marz) has gained a lot of traction recently. Fundamentally, it is a set of design patterns of dealing with Batch and Real time data processing workflow that fuel many organization's business operations. Although I don't realize any novice ideas has been introduced, it is the first time these principles are being outlined in such a clear and unambiguous manner. In this post, I'd like to summarize the key principles of the Lambda architecture, focus more in the underlying design principles and less in the choice of implementation technologies, which I may have a different favors from Nathan. One important distinction of Lambda architecture is that it has a clear separation between the batch processing pipeline (ie: Batch Layer) and the real-time processing pipeline (ie: Real-time Layer). Such separation provides a means to localize and isolate complexity for handling data update. To handle real-time query, Lambda architecture provide a mechanism (ie: Serving Layer) to merge/combine data from the Batch Layer and Real-time Layer and return the latest information to the user. Data Source Entry At the very beginning, data flows in Lambda architecture as follows ... Transaction data starts streaming in from OLTP system during business operations. Transaction data ingestion can be materialized in the form of records in OLTP systems, or text lines in App log files, or incoming API calls, or an event queue (e.g. Kafka) This transaction data stream is replicated and fed into both the Batch Layer and Realtime Layer Here is an overall architecture diagram for Lambda. Batch Layer For storing the ground truth, "Master dataset" is the most fundamental DB that captures all basic event happens. It stores data in the most "raw" form (and hence the finest granularity) that can be used to compute any perspective at any given point in time. As long as we can maintain the correctness of master dataset, every perspective of data view derived from it will be automatically correct. Given maintaining the correctness of master dataset is crucial, to avoid the complexity of maintenance, master dataset is "immutable". Specifically data can only be appended while update and delete are disallowed. By disallowing changes of existing data, it avoids the complexity of handling the conflicting concurrent update completely. Here is a conceptual schema of how the master dataset can be structured. The center green table represents the old, traditional-way of storing data in RDBMS. The surrounding blue tables illustrates the schema of how the master dataset can be structured, with some key highlights Data are partitioned by columns and stored in different tables. Columns that are closely related can be stored in the same table NULL values are not stored Each data record is associated with a time stamp since then the record is valid Notice that every piece of data is tagged with a time stamp at which the data is changed (or more precisely, a change record that represents the data modification is created). The latest state of an object can be retrieved by extracting the version of the object with the largest time stamp. Although master dataset stores data in the finest granularity and therefore can be used to compute result of any query, it usually take a long time to perform such computation if the processing starts with such raw form. To speed up the query processing, various data at intermediate form (called Batch View) that aligns closer to the query will be generated in a periodic manner. These batch views (instead of the original master dataset) will be used to serve the real-time query processing. To generate these batch views, the "Batch Layer" use a massively parallel, brute force approach to process the original master dataset. Notice that since data in master data set is timestamped, the data candidate can be identified simply from those that has the time stamp later than the last round of batch processing. Although less efficient, Lambda architecture advocates that at each round of batch view generation, the previous batch view should just be simply discarded and the new batch view is computed from master dataset. This simple-mind, compute-from-scratch approach has some good properties in stopping error propagation (since error cannot be accumulated), but the processing may not be optimized and may take a longer time to finish. This can increase the "staleness" of the batch view. Real time Layer As discussed above, generating the batch view requires scanning a large volume of master dataset that takes few hours. The batch view will therefore be stale for at least the processing time duration (ie: between the start and end of the Batch processing). But the maximum staleness can be up to the time period between the end of this Batch processing and the end of next Batch processing (ie: the batch cycle). The following diagram illustrate this staleness. Even the batch view is stale period, business operates as usual and transaction data will be streamed in continuously. To answer user's query with the latest, up-to-date information. The business transaction records need to be captured and merged into the real-time view. This is the responsibility of the Real-time Layer. To reduce the latency of latest information availability close to zero, the merge mechanism has to be done in an incremental manner such that no batching delaying the processing will be introduced. This requires the real time view update to be very different from the batch view update, which can tolerate a high latency. The end goal is that the latest information that is not captured in the Batch view will be made available in the Realtime view. The logic of doing the incremental merge on Realtime view is application specific. As a common use case, lets say we want to compute a set of summary statistics (e.g. mean, count, max, min, sum, standard deviation, percentile) of the transaction data since the last batch view update. To compute the sum, we can simply add the new transaction data to the existing sum and then write the new sum back to the real-time view. To compute the mean, we can multiply the existing count with existing mean, adding the transaction sum and then divide by the existing count plus one. To implement this logic, we need to READ data from the Realtime view, perform the merge and WRITE the data back to the Realtime view. This requires the Realtime serving DB (which host the Realtime view) to support both random READ and WRITE. Fortunately, since the realtime view only need to store the stale data up to one batch cycle, its scale is limited to some degree. Once the batch view update is completed, the real-time layer will discard the data from the real time serving DB that has time stamp earlier than the batch processing. This not only limit the data volume of Realtime serving DB, but also allows any data inconsistency (of the realtime view) to be clean up eventually. This drastically reduce the requirement of sophisticated multi-user, large scale DB. Many DB system support multiple user random read/write and can be used for this purpose. Serving Layer The serving layer is responsible to host the batch view (in the batch serving database) as well as hosting the real-time view (in the real-time serving database). Due to very different accessing pattern, the batch serving DB has a quite different characteristic from the real-time serving DB. As mentioned in above, while required to support efficient random read at large scale data volume, the batch serving DB doesn't need to support random write because data will only be bulk-loaded into the batch serving DB. On the other hand, the real-time serving DB will be incrementally (and continuously) updated by the real-time layer, and therefore need to support both random read and random write. To maintain the batch serving DB updated, the serving layer need to periodically check the batch layer progression to determine whether a later round of batch view generation is finished. If so, bulk load the batch view into the batch serving DB. After completing the bulk load, the batch serving DB has contained the latest version of batch view and some data in the real-time view is expired and therefore can be deleted. The serving layer will orchestrate these processes. This purge action is especially important to keep the size of the real-time serving DB small and hence can limit the complexity for handling real-time, concurrent read/write. To process a real-time query, the serving layer disseminates the incoming query into 2 different sub-queries and forward them to both the Batch serving DB and Realtime serving DB, apply application-specific logic to combine/merge their corresponding result and form a single response to the query. Since the data in the real-time view and batch view are different from a timestamp perspective, the combine/merge is typically done by concatenate the results together. In case of any conflict (same time stamp), the one from Batch view will overwrite the one from Realtime view. Final Thoughts By separating different responsibility into different layers, the Lambda architecture can leverage different optimization techniques specifically designed for different constraints. For example, the Batch Layer focuses in large scale data processing using simple, start-from-scratch approach and not worrying about the processing latency. On the other hand, the Real-time Layer covers where the Batch Layer left off and focus in low-latency merging of the latest information and no need to worry about large scale. Finally the Serving Layer is responsible to stitch together the Batch View and Realtime View to provide the final complete picture. The clear demarcation of responsibility also enable different technology stacks to be utilized at each layer and hence can tailor more closely to the organization's specific business need. Nevertheless, using a very different mechanism to update the Batch view (ie: start-from-scratch) and Realtime view (ie: incremental merge) requires two different algorithm implementation and code base to handle the same type of data. This can increase the code maintenance effort and can be considered to be the price to pay for bridging the fundamental gap between the "scalability" and "low latency" need. Nathan's Lambda architecture also introduce a set of candidate technologies which he has developed and used in his past projects (e.g. Hadoop for storing Master dataset, Hadoop for generating Batch view, ElephantDB for batch serving DB, Cassandra for realtime serving DB, STORM for generating Realtime view). The beauty of Lambda architecture is that the choice of technologies is completely decoupled so I intentionally do not describe any of their details in this post. On the other hand, I have my own favorite which is different and that will be covered in my future posts.
August 20, 2014
· 11,454 Views
article thumbnail
Estimating Statistics via Bootstrapping and Monte Carlo Simulation
We want to estimate some "statistics" (e.g. average income, 95 percentile height, variance of weight ... etc.) from a population. It will be too tedious to enumerate all members of the whole population. For efficiency reason, we randomly pick a number samples from the population, compute the statistics of the sample set to estimate the corresponding statistics of the population. We understand the estimation done this way (via random sampling) can deviate from the population. Therefore, in additional to our estimated statistics, we also include a "standard error" (how big our estimation may be deviated from the actual population statistics) or a "confidence interval" (a lower and upper bound of the statistics which we are confident about containing the true statistics). The challenge is how do we estimate the "standard error" or the "confidence interval". A straightforward way is to repeat the sampling exercise many times, each time we create a different sample set from which we compute one estimation. Then we look across all estimations from different sample sets to estimate the standard error and confidence interval of the estimation. But what if collecting data from a different sample set is expensive, or for any reason the population is no longer assessable after we collected our first sample set. Bootstrapping provides a way to address this ... Bootstrapping Instead of creating additional sample sets from the population, we create additional sample sets by re-sampling data (with replacement) from the original sample set. Each of the created sample set will follow the same data distribution of the original sample set, which in turns, follow the population. R provides a nice "bootstrap" library to do this. > library(boot) > # Generate a population > population.weight <- rnorm(100000, 160, 60) > # Lets say we care about the ninety percentile > quantile(population.weight, 0.9) 90% 236.8105 > # We create our first sample set of 500 samples > sample_set1 <- sample(population.weight, 500) > # Here is our sample statistic of ninety percentile > quantile(sample_set1, 0.9) 90% 232.3641 > # Notice that the sample statistics deviates from the population statistics > # We want to estimate how big is this deviation by using bootstrapping > # I need to define my function to compute the statistics > ninety_percentile <- function(x, idx) {return(quantile(x[idx], 0.9))} > # Bootstrapping will call this function many times with different idx > boot_result <- boot(data=sample_set1, statistic=ninety_percentile, R=1000) > boot_result ORDINARY NONPARAMETRIC BOOTSTRAP Call: boot(data = sample_set1, statistic = ninety_percentile, R = 1000) Bootstrap Statistics : original bias std. error t1* 232.3641 2.379859 5.43342 > plot(boot_result) > boot.ci(boot_result, type="bca") BOOTSTRAP CONFIDENCE INTERVAL CALCULATIONS Based on 1000 bootstrap replicates CALL : boot.ci(boot.out = boot_result, type = "bca") Intervals : Level BCa 95% (227.2, 248.1 ) Calculations and Intervals on Original Scale Here is the visual output of the bootstrap plot Bootstrapping is a powerful simulation technique for estimate any statistics in an empirical way. It is also non-parametric because it doesn't assume any model as well as parameters and just use the original sample set to estimate the statistics. If we assume certain distribution model want to see the distribution of certain statistics. Monte Carlo simulation provides a powerful way for this. Monte Carlo Simulation The idea is pretty simple, based on a particular distribution function (defined by a specific model parameters), we generate many sets of samples. We compute the statistics of each sample set and see how the statistics distributed across different sample sets. For example, given a normal distribution population, what is the probability distribution of the max value of 5 randomly chosen samples. > sample_stats <- rep(0, 1000) > for (i in 1:1000) { + sample_stats[i] <- max(rnorm(5)) + } > mean(sample_stats) [1] 1.153008 > sd(sample_stats) [1] 0.6584022 > par(mfrow=c(1,2)) > hist(sample_stats, breaks=30) > qqnorm(sample_stats) > qqline(sample_stats) Here is the distribution of the "max(5)" statistics, which shows some right skewness Bootstrapping and Monte Carlo simulation are powerful tools to estimate statistics in an empirical manner, especially when we don't have an analytic form of solution.
March 21, 2014
· 4,650 Views
article thumbnail
Escape Local Optimum Trap
Optimization is a very common technique in computer science and machine learning to search for the best (or good enough) solution. Optimization itself is a big topic and involves a wide range of mathematical techniques in different scenarios. In this post, I will be focusing on local search, which is a very popular technique in searching for an optimal solution based on a series of greedy local moves. The general setting of local search is as follows ... 1. Define an objective function 2. Pick an initial starting point 3. Repeat 3.1 Find a neighborhood 3.2 Locate a subset of neighbors that is a candidate move 3.3 Select a candidate from the candidate set 3.4 Move to the candidate One requirement is that the optimal solution need to be reachable by a sequence of moves. Usually this requires a proof that any arbitrary state is reachable by any arbitrary state through a sequence of moves. Notice that there are many possible strategies for each steps in 3.1, 3.2, 3.3. For example, one strategy can examine all members within the neighborhood, pick the best one (in terms of the objective function) and move to that. Another strategy can randomly pick a member within the neighborhood, and move to the member if it is better than the current state. Regardless of the strategies, the general theme is to move towards the members which is improving the objective function, hence the greedy nature of this algorithm. One downside of this algorithm is that it is possible to be trapped in a local optimum, whose is the best candidate within its neighborhood, but not the best candidate from a global sense. In the following, we'll explore a couple meta-heuristic techniques that can mitigate the local optimum trap. Multiple rounds We basically conduct k rounds of local search, each round getting a local optimum and then pick the best one out of these k local optimum. Simulated Anealing This strategy involves a dynamic combination of exploitation (better neighbor) and exploration (random walk to worse neighbor). The algorithm works in the following way ... 1. Pick an initial starting point 2. Repeat until terminate condition 2.1 Within neighborhood, pick a random member 2.2 If neighbor is better than me move to the neighbor else With chance exp(-(myObj - neighborObj)/Temp) move to the worse neighbor 2.3 Temp = alpha * Temp Tabu Search This strategy maintains a list of previously visited states (called Tabu list) and make sure these states will not be re-visited in subsequent exploration. The search will keep exploring the best move but skipping the previously visited nodes. This way the algorithm will explore the path that hasn't be visited before. The search also remember the best state obtained so far. 1. Initialization 1.1 Pick an initial starting point S 1.2 Initial an empty Tabu list 1.3 Set the best state to S 1.4 Put S into the Tabu list 2. Repeat until terminate condition 2.1 Find a neighborhood 2.2 Locate a smaller subset that is a candidate move 2.3 Remove elements that is already in Tabu list 2.4 Select the best candidate and move there 2.5 Add the new state to the Tabu list 2.6 If the new state is better that best state 2.6.1 Set the best state to this state 2.7 If the Tabu list is too large 2.7.1 Trim Tabu list by removing old items
December 16, 2013
· 6,656 Views
article thumbnail
OLAP Operation in R
OLAP (Online Analytical Processing) is a very common way to analyze raw transaction data by aggregating along different combinations of dimensions. This is a well-established field in Business Intelligence / Reporting. In this post, I will highlight the key ideas in OLAP operation and illustrate how to do this in R. Facts and Dimensions The core part of OLAP is a so-called "multi-dimensional data model", which contains two types of tables; "Fact" table and "Dimension" table A Fact table contains records each describe an instance of a transaction. Each transaction records contains categorical attributes (which describes contextual aspects of the transaction, such as space, time, user) as well as numeric attributes (called "measures" which describes quantitative aspects of the transaction, such as no of items sold, dollar amount). A Dimension table contain records that further elaborates the contextual attributes, such as user profile data, location details ... etc. In a typical setting of Multi-dimensional model ... Each fact table contains foreign keys that references the primary key of multiple dimension tables. In the most simple form, it is called a STAR schema. Dimension tables can contain foreign keys that references other dimensional tables. This provides a sophisticated detail breakdown of the contextual aspects. This is also called a SNOWFLAKE schema. Also this is not a hard rule, Fact table tends to be independent of other Fact table and usually doesn't contain reference pointer among each other. However, different Fact table usually share the same set of dimension tables. This is also called GALAXY schema. But it is a hard rule that Dimension table NEVER points / references Fact table A simple STAR schema is shown in following diagram. Each dimension can also be hierarchical so that the analysis can be done at different degree of granularity. For example, the time dimension can be broken down into days, weeks, months, quarter and annual; Similarly, location dimension can be broken down into countries, states, cities ... etc. Here we first create a sales fact table that records each sales transaction. # Setup the dimension tables state_table <- data.frame(key=c("CA", "NY", "WA", "ON", "QU"), name=c("California", "new York", "Washington", "Ontario", "Quebec"), country=c("USA", "USA", "USA", "Canada", "Canada")) month_table <- data.frame(key=1:12, desc=c("Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"), quarter=c("Q1","Q1","Q1","Q2","Q2","Q2","Q3","Q3","Q3","Q4","Q4","Q4")) prod_table <- data.frame(key=c("Printer", "Tablet", "Laptop"), price=c(225, 570, 1120)) # Function to generate the Sales table gen_sales <- function(no_of_recs) { # Generate transaction data randomly loc <- sample(state_table$key, no_of_recs, replace=T, prob=c(2,2,1,1,1)) time_month <- sample(month_table$key, no_of_recs, replace=T) time_year <- sample(c(2012, 2013), no_of_recs, replace=T) prod <- sample(prod_table$key, no_of_recs, replace=T, prob=c(1, 3, 2)) unit <- sample(c(1,2), no_of_recs, replace=T, prob=c(10, 3)) amount <- unit*prod_table[prod,]$price sales <- data.frame(month=time_month, year=time_year, loc=loc, prod=prod, unit=unit, amount=amount) # Sort the records by time order sales <- sales[order(sales$year, sales$month),] row.names(sales) <- NULL return(sales) } # Now create the sales fact table sales_fact <- gen_sales(500) # Look at a few records head(sales_fact) month year loc prod unit amount 1 1 2012 NY Laptop 1 225 2 1 2012 CA Laptop 2 450 3 1 2012 ON Tablet 2 2240 4 1 2012 NY Tablet 1 1120 5 1 2012 NY Tablet 2 2240 6 1 2012 CA Laptop 1 225 Multi-dimensional Cube Now, we turn this fact table into a hypercube with multiple dimensions. Each cell in the cube represents an aggregate value for a unique combination of each dimension. # Build up a cube revenue_cube <- tapply(sales_fact$amount, sales_fact[,c("prod", "month", "year", "loc")], FUN=function(x){return(sum(x))}) # Showing the cells of the cude revenue_cube , , year = 2012, loc = CA month prod 1 2 3 4 5 6 7 8 9 10 11 12 Laptop 1350 225 900 675 675 NA 675 1350 NA 1575 900 1350 Printer NA 2280 NA NA 1140 570 570 570 NA 570 1710 NA Tablet 2240 4480 12320 3360 2240 4480 3360 3360 5600 2240 2240 3360 , , year = 2013, loc = CA month prod 1 2 3 4 5 6 7 8 9 10 11 12 Laptop 225 225 450 675 225 900 900 450 675 225 675 1125 Printer NA 1140 NA 1140 570 NA NA 570 NA 1140 1710 1710 Tablet 3360 3360 1120 4480 2240 1120 7840 3360 3360 1120 5600 4480 , , year = 2012, loc = NY month prod 1 2 3 4 5 6 7 8 9 10 11 12 Laptop 450 450 NA NA 675 450 675 NA 225 225 NA 450 Printer NA 2280 NA 2850 570 NA NA 1710 1140 NA 570 NA Tablet 3360 13440 2240 2240 2240 5600 5600 3360 4480 3360 4480 3360 , , year = 2013, loc = NY ..... dimnames(revenue_cube) $prod [1] "Laptop" "Printer" "Tablet" $month [1] "1" "2" "3" "4" "5" "6" "7" "8" "9" "10" "11" "12" $year [1] "2012" "2013" $loc [1] "CA" "NY" "ON" "QU" "WA" OLAP Operations Here are some common operations of OLAP Slice Dice Rollup Drilldown Pivot "Slice" is about fixing certain dimensions to analyze the remaining dimensions. For example, we can focus in the sales happening in "2012", "Jan", or we can focus in the sales happening in "2012", "Jan", "Tablet". # Slice # cube data in Jan, 2012 revenue_cube[, "1", "2012",] loc prod CA NY ON QU WA Laptop 1350 450 NA 225 225 Printer NA NA NA 1140 NA Tablet 2240 3360 5600 1120 2240 # cube data in Jan, 2012 revenue_cube["Tablet", "1", "2012",] CA NY ON QU WA 2240 3360 5600 1120 2240 "Dice" is about limited each dimension to a certain range of values, while keeping the number of dimensions the same in the resulting cube. For example, we can focus in sales happening in [Jan/ Feb/Mar, Laptop/Tablet, CA/NY]. revenue_cube[c("Tablet","Laptop"), c("1","2","3"), , c("CA","NY")] , , year = 2012, loc = CA month prod 1 2 3 Tablet 2240 4480 12320 Laptop 1350 225 900 , , year = 2013, loc = CA month prod 1 2 3 Tablet 3360 3360 1120 Laptop 225 225 450 , , year = 2012, loc = NY month prod 1 2 3 Tablet 3360 13440 2240 Laptop 450 450 NA , , year = 2013, loc = NY month prod 1 2 3 Tablet 3360 4480 6720 Laptop 450 NA 225 "Rollup" is about applying an aggregation function to collapse a number of dimensions. For example, we want to focus in the annual revenue for each product and collapse the location dimension (ie: we don't care where we sold our product). apply(revenue_cube, c("year", "prod"), FUN=function(x) {return(sum(x, na.rm=TRUE))}) prod year Laptop Printer Tablet 2012 22275 31350 179200 2013 25200 33060 166880 "Drilldown" is the reverse of "rollup" and applying an aggregation function to a finer level of granularity. For example, we want to focus in the annual and monthly revenue for each product and collapse the location dimension (ie: we don't care where we sold our product). apply(revenue_cube, c("year", "month", "prod"), FUN=function(x) {return(sum(x, na.rm=TRUE))}) , , prod = Laptop month year 1 2 3 4 5 6 7 8 9 10 11 12 2012 2250 2475 1575 1575 2250 1800 1575 1800 900 2250 1350 2475 2013 2250 900 1575 1575 2250 2475 2025 1800 2025 2250 3825 2250 , , prod = Printer month year 1 2 3 4 5 6 7 8 9 10 11 12 2012 1140 5700 570 3990 4560 2850 1140 2850 2850 1710 3420 570 2013 1140 4560 3420 4560 2850 1140 570 3420 1140 3420 3990 2850 , , prod = Tablet month year 1 2 3 4 5 6 7 8 9 10 11 12 2012 14560 23520 17920 12320 10080 14560 13440 15680 25760 12320 11200 7840 2013 8960 11200 10080 7840 14560 10080 29120 15680 15680 8960 12320 22400 "Pivot" is about analyzing the combination of a pair of selected dimensions. For example, we want to analyze the revenue by year and month. Or we want to analyze the revenue by product and location. apply(revenue_cube, c("year", "month"), FUN=function(x) {return(sum(x, na.rm=TRUE))}) month year 1 2 3 4 5 6 7 8 9 10 11 12 2012 17950 31695 20065 17885 16890 19210 16155 20330 29510 16280 15970 10885 2013 12350 16660 15075 13975 19660 13695 31715 20900 18845 14630 20135 27500 apply(revenue_cube, c("prod", "loc"), FUN=function(x) {return(sum(x, na.rm=TRUE))}) loc prod CA NY ON QU WA Laptop 16425 9450 7650 7425 6525 Printer 15390 19950 7980 10830 10260 Tablet 90720 117600 45920 34720 57120 I hope you can get a taste of the richness of data processing model in R. However, since R is doing all the processing in RAM. This requires your data to be small enough so it can fit into the local memory in a single machine.
July 30, 2013
· 17,208 Views · 3 Likes
article thumbnail
Text Processing, Part 2: Oh, Inverted Index
This is the second part of my text processing series. In this blog, we'll look into how text documents can be stored in a form that can be easily retrieved by a query. I'll used the popular open source Apache Lucene index for illustration. There are two main processing flow in the system ... Document indexing: Given a document, add it into the index Document retrieval: Given a query, retrieve the most relevant documents from the index. The following diagram illustrate how this is done in Lucene. Index Structure Both documents and query is represented as a bag of words. In Apache Lucene, "Document" is the basic unit for storage and retrieval. A "Document" contains multiple "Fields" (also call zones). Each "Field" contains multiple "Terms" (equivalent to words). To control how the document will be indexed across its containing fields, a Field can be declared in multiple ways to specified whether it should be analyzed (a pre-processing step during index), indexed (participate in the index) or stored (in case it needs to be returned in query result). Keyword (Not analyzed, Indexed, Stored) Unindexed (Not analyzed, Not indexed, Stored) Unstored (Analyzed, Indexed, Not stored) Text (Analyzed, Indexed, Stored) The inverted index is a core data structure of the storage. It is organized as an inverted manner from terms to the list of documents (which contain the term). The list (known as posting list) is ordered by a global ordering (typically by document id). To enable faster retrieval, the list is not just a single list but a hierarchy of skip lists. For simplicity, we ignore the skip list in subsequent discussion. This data structure is illustration below based on Lucene's implementation. It is stored on disk as segment files which will be brought to memory during the processing. The above diagram only shows the inverted index. The whole index contain an additional forward index as follows. Document indexing Document in its raw form is extracted from a data adaptor. (this can be making an Web API to retrieve some text output, or crawl a web page, or receiving an HTTP document upload). This can be done in a batch or online manner. When the index processing start, it parses each raw document and analyze its text content. The typical steps includes ... Tokenize the document (breakdown into words) Lowercase each word (to make it non-case-sensitive, but need to be careful with names or abbreviations) Remove stop words (take out high frequency words like "the", "a", but need to careful with phrases) Stemming (normalize different form of the same word, e.g. reduce "run", "running", "ran" into "run") Synonym handling. This can be done in two ways. Either expand the term to include its synonyms (ie: if the term is "huge", add "gigantic" and "big"), or reduce the term to a normalized synonym (ie: if the term is "gigantic" or "huge", change it to "big") At this point, the document is composed with multiple terms. doc = [term1, term2 ...]. Optionally, terms can be further combined into n-grams. After that we count the term frequency of this document. For example, in a bi-gram expansion, the document will become ... doc1 -> {term1: 5, term2: 8, term3: 4, term1_2: 3, term2_3:1} We may also compute a "static score" based on some measure of quality of the document. After that, we insert the document into the posting list (if it exist, otherwise create a new posting list) for each terms (all n-grams), this will create the inverted list structure as shown in previous diagram. There is a boost factor that can be set to the document or field. The boosting factor effectively multiply the term frequency which effectively affecting the importance of the document or field. Document can be added to the index in one of the following ways; inserted, modified and deleted. Typically the document will first added to the memory buffer, which is organized as an inverted index in RAM. When this is a document insertion, it goes through the normal indexing process (as I described above) to analyze the document and build an inverted list in RAM. When this is a document deletion (the client request only contains the doc id), it fetches the forward index to extract the document content, then goes through the normal indexing process to analyze the document and build the inverted list. But in this case the doc object in the inverted list is labeled as "deleted". When this is a document update (the client request contains the modified document), it is handled as a deletion followed by an insertion, which means the system first fetch the old document from the forward index to build an inverted list with nodes marked "deleted", and then build a new inverted list from the modified document. (e.g. If doc1 = "A B" is update to "A C", then the posting list will be {A:doc1(deleted) -> doc1, B:doc1(deleted), C:doc1}. After collapsing A, the posting list will be {A:doc1, B:doc1(deleted), C:doc1} As more and more document are inserted into the memory buffer, it will become full and will be flushed to a segment file on disk. In the background, when M segments files have been accumulated, Lucene merges them into bigger segment files. Notice that the size of segment files at each level is exponentially increased (M, M^2, M^3). This maintains the number of segment files that need to be search per query to be at the O(logN) complexity where N is the number of documents in the index. Lucene also provide an explicit "optimize" call that merges all the segment files into one. Here lets detail a bit on the merging process, since the posting list is already vertically ordered by terms and horizontally ordered by doc id, merging two segment files S1, S2 is basically as follows Walk the posting list from both S1 and S2 together in sorted term order. For those non-common terms (term that appears in one of S1 or S2 but not both), write out the posting list to a new segment S3. Until we find a common term T, we merge the corresponding posting list from these 2 segments. Since both list are sorted by doc id, we just walk down both posting list to write out the doc object to a new posting list. When both posting lists have the same doc (which is the case when the document is updated or deleted), we pick the latest doc based on time order. Finally, the doc frequency of each posting list (of the corresponding term) will be computed. Document retrieval Consider a document is a vector (each term as the separated dimension and the corresponding value is the tf-idf value) and the query is also a vector. The document retrieval problem can be defined as finding the top-k most similar document that match a query, where similarity is defined as the dot-product or cosine distance between the document vector and the query vector. tf-idf is a normalized frequency. TF (term frequency) represents how many time the term appears in the document (usually a compression function such as square root or logarithm is applied). IDF is the inverse of document frequency which is used to discount the significance if that term appears in many other documents. There are many variants of TF-IDF but generally it reflects the strength of association of the document (or query) with each term. Given a query Q containing terms [t1, t2], here is how we fetch the corresponding documents. A common approach is the "document at a time approach" where we traverse the posting list of t1, t2 concurrently (as opposed to the "term at a time" approach where we traverse the whole posting list of t1 before we start the posting list of t2). The traversal process is described as follows ... For each term t1, t2 in query, we identify all the corresponding posting lists. We walk each posting list concurrently to return a sequence of documents (ordered by doc id). Notice that each return document contains at least one term but can also also contain multiple terms. We compute the dynamic score which is dot product of the query to document vector. Notice that we typically don't concern the TF/IDF of the query (which is short and we don't care the frequency of each term). Therefore we can just compute the sum up all the TF score of the posting list that has a match term after dividing the IDF score (at the head of each posting list). Lucene also support query level boosting where a boost factor can be attached to the query terms. The boost factor will multiply the term frequency correspondingly. We also look up the static score which is purely based on the document (but not the query). The total score is a linear combination of static and dynamic score. Although the score we used in above calculation is based on computing the cosine distance between the query and document, we are not restricted to that. We can plug in any similarity function that make sense to the domain. (e.g. we can use machine learning to train a model to score the similarity between a query and a document). After we compute a total score, we insert the document into a heap data structure where the topK scored document is maintained. Here the whole posting list will be traversed. In case of the posting list is very long, the response time latency will be long. Is there a way that we don't have to traverse the whole list and still be able to find the approximate top K documents ? There are a couple strategies we can consider. Static Score Posting Order: Notice that the posting list is sorted based on a global order, this global ordering provide a monotonic increasing document id during the traversal that is important to support the "document at a time" traversal because it is impossible to visit the same document again. This global ordering, however, can be quite arbitrary and doesn't have to be the document id. So we can pick the order to be based on the static score (e.g. quality indicator of the document) which is global. The idea is that we traverse the posting list in decreasing magnitude of static score, so we are more likely to visit the document with the higher total score (static + dynamic score). Cut frequent terms: We do not traverse the posting list whose term has a low IDF value (ie: the term appears in many documents and therefore the posting list tends to be long). This way we avoid to traverse the long posting list. TopR list: For each posting list, we create an extra posting list which contains the top R documents who has the highest TF (term frequency) in the original list. When we perform the search, we perform our search in this topR list instead of the original posting list. Since we have multiple inverted index (in memory buffer as well as the segment files at different levels), we need to combine the result them. If termX appears in both segmentA and segmentB, then the fresher version will be picked. The fresher version is determine as follows; the segment with a lower level (smaller size) will be considered more fresh. If the two segment files are at the same level, then the one with a higher number is more fresh. On the other hand, the IDF value will be the sum of the corresponding IDF of each posting list in the segment file (the value will be slightly off if the same document has been updated, but such discrepancy is negligible). However, the processing of consolidating multiple segment files incur processing overhead in document retrieval. Lucene provide an explicit "optimize" call to merge all segment files into one single file so there is no need to look at multiple segment files during document retrieval. Distributed Index For large corpus (like the web documents), the index is typically distributed across multiple machines. There are two models of distribution: Term partitioning and Document partitioning. In document partitioning, documents are randomly spread across different partitions where the index is built. In term partitioning, the terms are spread across different partitions. We'll discuss document partitioning as it is more commonly used. Distributed index is provider by other technologies that is built on Lucene, such as ElasticSearch. A typical setting is as follows ... In this setting, machines are organized as columns and rows. Each column represent a partition of documents while each row represent a replica of the whole corpus. During the document indexing, first a row of the machines is randomly selected and will be allocated for building the index. When a new document crawled, a column machine from the selected row is randomly picked to host the document. The document will be sent to this machine where the index is build. The updated index will be later propagated to the other rows of replicas. During the document retrieval, first a row of replica machines is selected. The client query will then be broadcast to every column machine of the selected row. Each machine will perform the search in its local index and return the TopM elements to the query processor which will consolidate the results before sending back to client. Notice that K/P < M < K, where K is the TopK documents the client expects and P is the number of columns of machines. Notice that M is a parameter that need to be tuned. One caveat of this distributed index is that as the posting list is split horizontally across partitions, we lost the global view of the IDF value without which the machine is unable to calculate the TF-IDF score. There are two ways to mitigate that ... Do nothing: here we assume the document are evenly spread across different partitions so the local IDF represents a good ratio of the actual IDF. Extra round trip: In the first round, query is broadcasted to every column which returns its local IDF. The query processor will collected all IDF response and compute the sum of the IDF. In the second round, it broadcast the query along with the IDF sum to each column of machines, which will compute the local score based on the IDF sum.
February 26, 2013
· 8,415 Views
article thumbnail
Optimization in R
Optimization is a very common problem in data analytics. Given a set of variables (which one has control), how to pick the right value such that the benefit is maximized. More formally, optimization is about determining a set of variables x1, x2, … that maximize or minimize an objective function f(x1, x2, …). Unconstrained optimization In an unconstraint case, the variable can be freely selected within its full range. A typical solution is to compute the gradient vector of the objective function [∂f/∂x1, ∂f/∂x2, …] and set it to [0, 0, …]. Solve this equation and output the result x1, x2 … which will give the local maximum. In R, this can be done by a numerical analysis method. > f <- function(x){(x[1] - 5)^2 + (x[2] - 6)^2} > initial_x <- c(10, 11) > x_optimal <- optim(initial_x, f, method="CG") > x_min <- x_optimal$par > x_min [1] 5 6 Equality constraint optimization Moving onto the constrained case, lets say x1, x2 … are not independent and then have to related to each other in some particular way: g1(x1, x2, …) = 0, g2(x1, x2, …) = 0. The optimization problem can be expressed as … Maximize objective function: f(x1, x2, …) Subjected to equality constraints: g1(x1, x2, …) = 0 g2(x1, x2, …) = 0 A typical solution is to turn the constraint optimization problem into an unconstrained optimization problem using Lagrange multipliers. Define a new function F as follows ... F(x1, x2, …, λ1, λ2, …) = f(x1, x2, …) + λ1.g1(x1, x2, …) + λ2.g2(x1, x2, …) + … Then solve for ... [∂F/∂x1, ∂F/∂x2, …, ∂F/∂λ1, ∂F/∂λ2, …] = [0, 0, ….] Inequality constraint optimization In this case, the constraint is inequality. We cannot use the Lagrange multiplier technique because it requires equality constraint. There is no general solution for arbitrary inequality constraints. However, we can put some restriction in the form of constraint. In the following, we study two models where constraint is restricted to be a linear function of the variables: w1.x1 + w2.x2 + … >= 0 Linear Programming Linear programming is a model where both the objective function and constraint function is restricted as linear combination of variables. The Linear Programming problem can be defined as follows ... Maximize objective function: f(x1, x2) = c1.x1 + c2.x2 Subjected to inequality constraints: a11.x1 + a12.x2 <= b1 a21.x1 + a22.x2 <= b2 a31.x1 + a32.x2 <= b3 x1 >= 0, x2 >=0 As an illustrative example, lets walkthrough a portfolio investment problem. In the example, we want to find an optimal way to allocate the proportion of asset in our investment portfolio. StockA gives 5% return on average StockB gives 4% return on average StockC gives 6% return on average To set some constraints, lets say my investment in C must be less than sum of A, B. Also, investment in A cannot be more than twice of B. Finally, at least 10% of investment in each stock. To formulate this problem: Variable: x1 = % investment in A, x2 = % in B, x3 = % in C Maximize expected return: f(x1, x2, x3) = x1*5% + x2*4% + x3*6% Subjected to constraints: 10% < x1, x2, x3 < 100% x1 + x2 + x3 = 1 x3 < x1 + x2 x1 < 2 * x2 > library(lpSolve) > library(lpSolveAPI) > # Set the number of vars > model <- make.lp(0, 3) > # Define the object function: for Minimize, use -ve > set.objfn(model, c(-0.05, -0.04, -0.06)) > # Add the constraints > add.constraint(model, c(1, 1, 1), "=", 1) > add.constraint(model, c(1, 1, -1), ">", 0) > add.constraint(model, c(1, -2, 0), "<", 0) > # Set the upper and lower bounds > set.bounds(model, lower=c(0.1, 0.1, 0.1), upper=c(1, 1, 1)) > # Compute the optimized model > solve(model) [1] 0 > # Get the value of the optimized parameters > get.variables(model) [1] 0.3333333 0.1666667 0.5000000 > # Get the value of the objective function > get.objective(model) [1] -0.05333333 > # Get the value of the constraint > get.constraints(model) [1] 1 0 0 Quadratic Programming Quadratic programming is a model where both the objective function is a quadratic function (contains up to two term products) while constraint function is restricted as linear combination of variables. The Quadratic Programming problem can be defined as follows ... Minimize quadratic objective function: f(x1, x2) = c1.x12 + c2. x1x2 + c2.x22 - (d1. x1 + d2.x2) Subject to constraints a11.x1 + a12.x2 == b1 a21.x1 + a22.x2 == b2 a31.x1 + a32.x2 >= b3 a41.x1 + a42.x2 >= b4 a51.x1 + a52.x2 >= b5 Express the problem in Matrix form: Minimize objective ½(DTx) - dTx Subject to constraint ATx >= b First k columns of A is equality constraint As an illustrative example, lets continue on the portfolio investment problem. In the example, we want to find an optimal way to allocate the proportion of asset in our investment portfolio. StockA gives 5% return on average StockB gives 4% return on average StockC gives 6% return on average We also look into the variance of each stock (known as risk) as well as the covariance between stocks. For investment, we not only want to have a high expected return, but also a low variance. In other words, stocks with high return but also high variance is not very attractive. Therefore, maximize the expected return and minimize the variance is the typical investment strategy. One way to minimize variance is to pick multiple stocks (in a portfolio) to diversify the investment. Diversification happens when the stock components within the portfolio moves from their average in different direction (hence the variance is reduced). The Covariance matrix ∑ (between each pairs of stocks) is given as follows: 1%, 0.2%, 0.5% 0.2%, 0.8%, 0.6% 0.5%, 0.6%, 1.2% In this example, we want to achieve a overall return of at least 5.2% of return while minimizing the variance of return To formulate the problem: Variable: x1 = % investment in A, x2 = % in B, x3 = % in C Minimize variance: xT∑x Subjected to constraint: x1 + x2 + x3 == 1 X1*5% + x2*4% + x3*6% >= 5.2% 0% < x1, x2, x3 < 100% > library(quadprog) > mu_return_vector <- c(0.05, 0.04, 0.06) > sigma <- matrix(c(0.01, 0.002, 0.005, + 0.002, 0.008, 0.006, + 0.005, 0.006, 0.012), + nrow=3, ncol=3) > D.Matrix <- 2*sigma > d.Vector <- rep(0, 3) > A.Equality <- matrix(c(1,1,1), ncol=1) > A.Matrix <- cbind(A.Equality, mu_return_vector, diag(3)) > b.Vector <- c(1, 0.052, rep(0, 3)) > out <- solve.QP(Dmat=D.Matrix, dvec=d.Vector, Amat=A.Matrix, bvec=b.Vector, meq=1) > out$solution [1] 0.4 0.2 0.4 > out$value [1] 0.00672 > Integration with Predictive Analytics Optimization is usually integrated with predictive analytics, which is another important part of data analytics. For an overview of predictive analytics, here is my previous blog about it. The overall processing can be depicted as follows: In this diagram, we use machine learning to train a predictive model in batch mode. Once the predictive model is available, observation data is fed into it at real time and a set of output variables is predicted. Such output variable will be fed into the optimization model as the environment parameters (e.g. return of each stock, covariance ... etc.) from which a set of optimal decision variable is recommended.
January 15, 2013
· 8,907 Views
article thumbnail
Detecting Communities in Social Graph
In analyzing social network, one common problem is how to detecting communities, such as groups of people who knows or interacting frequently with each other. Community is a subgraph of a graph where the connectivity are unusually dense. In this blog, I will enumerate some common algorithms on finding communities. First of all, community detection can be think of graph partitioning problem. In this case, a single node will belong to no more than one community. In other words, community does not overlap with each other. High Betweenness Edge Removal The intuition is that members within a community are densely connected and have many paths to reach each other. On the other hand, nodes from different communities requires inter-community links to reach each other, and these inter-community links tends to have high betweenness score. Therefore, by removing these high-betweenness links, the graph will be segregated into communities. Algorithm: For each edge, compute the edge-betweenness score Remove the edge with the highest betweenness score Until there are enough segregation However, while this method achieve good result, it is very slow and not work effectively when there are more than couple thousand nodes with dense edges. Hierarchical Clustering This is a very general approach of detecting communities. Some measure of distance (or similarities) is defined and computed between every pair of nodes first. Then classical hierarchical cluster technique can be applied. The distance should be chosen such that it is small between members within a community and big between members of different community. Random Walk Random walk can be used to compute the distance between every pair of nodes node-B and node-C. Lets focus on undirected graph for now. A random walker starts at node-B, throw a dice and has beta probability that it randomly pick a neighbor to visit based on the weight of links, and with 1 - beta probability that it will teleport back to the original node-v. At an infinite number of steps, the probability distribution of landing on node-w will be high if node-B and node-C belongs to the same community. The intuition here is that the random walker tends to be trapped within the community so all nodes that has high probability distribution tends to be within the same community as node-B (where the random walker is started). Notice that the pick of beta is important. If it is too big (close to 1), then the probability after converging is independent of the starting node (ie: the probability distribution only reflect the centrality of each node but not the community of the starting node). If beta is too small (close to zero), then the walker will die down too quick before it fully explore the community's connectivity. There is an analytical solution to this problem. Lets M be the transition matrix before every pair of nodes. V represents the probability distribution of where the random walkers is. The "distance" between node-B and every other nodes is the eigenvector of M. We can repeat the same to find out distance of all pairs of nodes, and then feed the result to a hierarchical clustering algorithm. Label Propagation The basic idea is that nodes observe its neighbors and set its own label to be the majority of its neighbors. Nodes are initially assigned with a unique label. In each round, each node examine the label of all its neighbors are set its own label to be the majority of its neighbors, when there is a tie, the winner is picked randomly. Until there is no more change of label assignments Modularity Optimization Within a community, the probability of 2 nodes having a link should be higher than if the link is just formed randomly within the whole graph. probability of random link = deg(node-B) * deg(node-C) / (N * (N-1)) The actual link = Adjacency matrix[B, C] Define com(B) to be community of node-B, com(C) to be community of node-C So a utility function "Modularity" is created as follows ... sum_over_v_w((actual - random) * delta(com(B), com(C))) Now we examine communities that can be overlapping. ie: A single node can belong to more than one community. Finding Clique Simple community detection usually starts with cliques. Clique is a subgraph whether every node is connected to any other node. In a K-Clique, there are K nodes and K^2 links between them. However, communities has a looser definition, we don't require everyone to know every other people within the community, but we need them to know "enough" (maybe a certain percentage) of other people in the community. K-core is more relaxed definition, it requires the nodes of the K-core to have connectivity to at least K other members. There are some less popular relaxation, K-Clan requires every node to connect with every other members within K steps (path length less than K). K-plex requires the node to connect to (N - K) members in the node where N total number of members within the K-plex. The community is defined as the found K-core, or K-clan, or K-plex. K-Clique Percolation Another popular way of finding community is by rolling across adjacent K-Clique. Two K-Clique is adjacent if they share K-1 nodes. K is a parameter that we need to pick based on how dense we expect the community should have. The algorithm is illustrated in following diagram. K-Clique percolation is a popular way to identify communities which can potentially be overlapping with each other.
December 23, 2012
· 4,206 Views
article thumbnail
Machine Learning: Measuring Similarity and Distance
Measuring similarity or distance between two data points is fundamental to many Machine Learning algorithms such as K-Nearest-Neighbor, Clustering ... etc.
August 10, 2012
· 53,444 Views · 6 Likes
article thumbnail
Everything You Need To Know About Couchbase Architecture
After receiving a lot of good feedback and comment on my last blog on MongoDb, I was encouraged to do another deep dive on another popular document oriented db; Couchbase. I have been a long-time fan CouchDb and has wrote a blog on it many years ago. After it merges with Membase, I am very excited to take a deep look into it again. Couchbase is the merge of two popular NOSQL technologies: Membase, which provides persistence, replication, sharding to the high performance memcached technology CouchDB, which pioneers the document oriented model based on JSON Like other NOSQL technologies, both Membase and CouchDB are built from the ground up on a highly distributed architecture, with data shard across machines in a cluster. Built around the Memcached protocol, Membase provides an easy migration to existing Memcached users who want to add persistence, sharding and fault resilience on their familiar Memcached model. On the other hand, CouchDB provides first class support for storing JSON documents as well as a simple RESTful API to access them. Underneath, CouchDB also has a highly tuned storage engine that is optimized for both update transaction as well as query processing. Taking the best of both technologies, Membase is well-positioned in the NOSQL marketplace. Programming model Couchbase provides client libraries for different programming languages such as Java / .NET / PHP / Ruby / C / Python / Node.js For read, Couchbase provides a key-based lookup mechanism where the client is expected to provide the key, and only the server hosting the data (with that key) will be contacted. Couchbase also provides a query mechanism to retrieve data where the client provides a query (for example, range based on some secondary key) as well as the view (basically the index). The query will be broadcasted to all servers in the cluster and the result will be merged and sent back to the client. For write, Couchbase provides a key-based update mechanism where the client sends in an updated document with the key (as doc id). When handling write request, the server will return to client’s write request as soon as the data is stored in RAM on the active server, which offers the lowest latency for write requests. Following is the core API that Couchbase offers. (in an abstract sense) # Get a document by key doc = get(key) # Modify a document, notice the whole document # need to be passed in set(key, doc) # Modify a document when no one has modified it # since my last read casVersion = doc.getCas() cas(key, casVersion, changedDoc) # Create a new document, with an expiration time # after which the document will be deleted addIfNotExist(key, doc, timeToLive) # Delete a document delete(key) # When the value is an integer, increment the integer increment(key) # When the value is an integer, decrement the integer decrement(key) # When the value is an opaque byte array, append more # data into existing value append(key, newData) # Query the data results = query(viewName, queryParameters) In Couchbase, document is the unit of manipulation. Currently Couchbase doesn't support server-side execution of custom logic. Couchbase server is basically a passive store and unlike other document oriented DB, Couchbase doesn't support field-level modification. In case of modifying documents, client need to retrieve documents by its key, do the modification locally and then send back the whole (modified) document back to the server. This design tradeoff network bandwidth (since more data will be transferred across the network) for CPU (now CPU load shift to client). Couchbase currently doesn't support bulk modification based on a condition matching. Modification happens only in a per document basis. (client will save the modified document one at a time). Transaction Model Similar to many NOSQL databases, Couchbase’s transaction model is primitive as compared to RDBMS. Atomicity is guaranteed at a single document and transactions that span update of multiple documents are unsupported. To provide necessary isolation for concurrent access, Couchbase provides a CAS (compare and swap) mechanism which works as follows … When the client retrieves a document, a CAS ID (equivalent to a revision number) is attached to it. While the client is manipulating the retrieved document locally, another client may modify this document. When this happens, the CAS ID of the document at the server will be incremented. Now, when the original client submits its modification to the server, it can attach the original CAS ID in its request. The server will verify this ID with the actual ID in the server. If they differ, the document has been updated in between and the server will not apply the update. The original client will re-read the document (which now has a newer ID) and re-submit its modification. Couchbase also provides a locking mechanism for clients to coordinate their access to documents. Clients can request a LOCK on the document it intends to modify, update the documents and then releases the LOCK. To prevent a deadlock situation, each LOCK grant has a timeout so it will automatically be released after a period of time. Deployment Architecture In a typical setting, a Couchbase DB resides in a server clusters involving multiple machines. Client library will connect to the appropriate servers to access the data. Each machine contains a number of daemon processes which provides data access as well as management functions. The data server, written in C/C++, is responsible to handle get/set/delete request from client. The Management server, written in Erlang, is responsible to handle the query traffic from client, as well as manage the configuration and communicate with other member nodes in the cluster. Virtual Buckets The basic unit of data storage in Couchbase DB is a JSON document (or primitive data type such as int and byte array) which is associated with a key. The overall key space is partitioned into 1024 logical storage unit called "virtual buckets" (or vBucket). vBucket are distributed across machines within the cluster via a map that is shared among servers in the cluster as well as the client library. High availability is achieved through data replication at the vBucket level. Currently Couchbase supports one active vBucket zero or more standby replicas hosted in other machines. Curremtly the standby server are idle and not serving any client request. In future version of Couchbase, the standby replica will be able to serve read request. Load balancing in Couchbase is achieved as follows: Keys are uniformly distributed based on the hash function When machines are added and removed in the cluster. The administrator can request a redistribution of vBucket so that data are evenly spread across physical machines. Management Server Management server performs the management function and co-ordinate the other nodes within the cluster. It includes the following monitoring and administration functions Heartbeat: A watchdog process periodically communicates with all member nodes within the same cluster to provide Couchbase Server health updates. Process monitor: This subsystem monitors execution of the local data manager, restarting failed processes as required and provide status information to the heartbeat module. Configuration manager: Each Couchbase Server node shares a cluster-wide configuration which contains the member nodes within the cluster, a vBucket map. The configuration manager pull this config from other member nodes at bootup time. Within a cluster, one node’s Management Server will be elected as the leader which performs the following cluster-wide management function Controls the distribution of vBuckets among other nodes and initiate vBucket migration Orchestrates the failover and update the configuration manager of member nodes If the leader node crashes, a new leader will be elected from surviving members in the cluster. When a machine in the cluster has crashed, the leader will detect that and notify member machines in the cluster that all vBuckets hosted in the crashed machine is dead. After getting this signal, machines hosting the corresponding vBucket replica will set the vBucket status as “active”. The vBucket/server map is updated and eventually propagated to the client lib. Notice that at this moment, the replication level of the vBucket will be reduced. Couchbase doesn’t automatically re-create new replicas which will cause data copying traffic. Administrator can issue a command to explicitly initiate a data rebalancing. The crashed machine, after reboot can rejoin the cluster. At this moment, all the data it stores previously will be completely discard and the machine will be treated as a brand new empty machine. As more machines are put into the cluster (for scaling out), vBucket should be redistributed to achieve a load balance. This is currently triggered by an explicit command from the administrator. Once receive the “rebalance” command, the leader will compute the new provisional map which has the balanced distribution of vBuckets and send this provisional map to all members of the cluster. To compute the vBucket map and migration plan, the leader attempts the following objectives: Evenly distribute the number of active vBuckets and replica vBuckets among member nodes. Place the active copy and each replicas in physically separated nodes. Spread the replica vBucket as wide as possible among other member nodes. Minimize the amount of data migration Orchestrate the steps of replica redistribution so no node or network will be overwhelmed by the replica migration. Once the vBucket maps is determined, the leader will pass the redistribution map to each member in the cluster and coordinate the steps of vBucket migration. The actual data transfer happens directly between the origination node to the destination node. Notice that since we have generally more vBuckets than machines. The workload of migration will be evenly distributed automatically. For example, when new machines are added into the clusters, all existing machines will migrate some portion of its vBucket to the new machines. There is no single bottleneck in the cluster. Throughput the migration and redistribution of vBucket among servers, the life cycle of a vBucket in a server will be in one of the following states “Active”: means the server is hosting the vBucket is ready to handle both read and write request “Replica”: means the server is hosting the a copy of the vBucket that may be slightly out of date but can take read request that can tolerate some degree of outdate. “Pending”: means the server is hosting a copy that is in a critical transitional state. The server cannot take either read or write request at this moment. “Dead”: means the server is no longer responsible for the vBucket and will not take either read or write request anymore. Data Server Data server implements the memcached APIs such as get, set, delete, append, prepend, etc. It contains the following key datastructure: One in-memory hashtable (key by doc id) for the corresponding vBucket hosted. The hashtable acts as both a metadata for all documents as well as a cache for the document content. Maintain the entry gives a quick way to detect whether the document exists on disk. To support async write, there is a checkpoint linkedlist per vBucket holding the doc id of modified documents that hasn't been flushed to disk or replicated to the replica. To handle a "GET" request Data server routes the request to the corresponding ep-engine responsible for the vBucket. The ep-engine will lookup the document id from the in-memory hastable. If the document content is found in cache (stored in the value of the hashtable), it will be returned. Otherwise, a background disk fetch task will be created and queued into the RO dispatcher queue. The RO dispatcher then reads the value from the underlying storage engine and populates the corresponding entry in the vbucket hash table. Finally, the notification thread notifies the disk fetch completion to the memcached pending connection, so that the memcached worker thread can revisit the engine to process a get request. To handle a "SET" request, a success response will be returned to the calling client once the updated document has been put into the in-memory hashtable with a write request put into the checkpoint buffer. Later on the Flusher thread will pickup the outstanding write request from each checkpoint buffer, lookup the corresponding document content from the hashtable and write it out to the storage engine. Of course, data can be lost if the server crashes before the data has been replicated to another server and/or persisted. If the client requires a high data availability across different crashes, it can issue a subsequent observe() call which blocks on the condition that the server persist data on disk, or the server has replicated the data to another server (and get its ACK). Overall speaking, the client has various options to tradeoff data integrity with throughput. Hashtable Management To synchronize accesses to a vbucket hash table, each incoming thread needs to acquire a lock before accessing a key region of the hash table. There are multiple locks per vbucket hash table, each of which is responsible for controlling exclusive accesses to a certain ket region on that hash table. The number of regions of a hash table can grow dynamically as more documents are inserted into the hash table. To control the memory size of the hashtable, Item pager thread will monitor the memory utilization of the hashtable. Once a high watermark is reached, it will initiate an eviction process to remove certain document content from the hashtable. Only entries that is not referenced by entries in the checkpoint buffer can be evicted because otherwise the outstanding update (which only exists in hashtable but not persisted) will be lost. After eviction, the entry of the document still remains in the hashtable; only the document content of the document will be removed from memory but the metadata is still there. The eviction process stops after reaching the low watermark. The high / low water mark is determined by the bucket memory quota. By default, the high water mark is set to 75% of bucket quota, while the low water mark is set to 60% of bucket quota. These water marks can be configurable at runtime. In CouchDb, every document is associated with an expiration time and will be deleted once it is expired. Expiry pager is responsible for tracking and removing expired document from both the hashtable as well as the storage engine (by scheduling a delete operation). Checkpoint Manager Checkpoint manager is responsible to recycle the checkpoint buffer, which holds the outstanding update request, consumed by the two downstream processes, Flusher and TAP replicator. When all the request in the checkpoint buffer has been processed, the checkpoint buffer will be deleted and a new one will be created. TAP Replicator TAP replicator is responsible to handle vBucket migration as well as vBucket replication from active server to replica server. It does this by propagating the latest modified document to the corresponding replica server. At the time a replica vBucket is established, the entire vBucket need to be copied from the active server to the empty destination replica server as follows The in-memory hashtable at the active server will be transferred to the replica server. Notice that during this period, some data may be updated and therefore the data set transfered to the replica can be inconsistent (some are the latest and some are outdated). Nevertheless, all updates happen after the start of transfer is tracked in the checkpoint buffer. Therefore, after the in-memory hashtable transferred is completed, the TAP replicator can pickup those updates from the checkpoint buffer. This ensures the latest versioned of changed documents are sent to the replica, and hence fix the inconsistency. However the hashtable cache doesn’t contain all the document content. Data also need to be read from the vBucket file and send to the replica. Notice that during this period, update of vBucket will happen in active server. However, since the file is appended only, subsequent data update won’t interfere the vBucket copying process. After the replica server has caught up, subsequent update at the active server will be available at its checkpoint buffer which will be pickup by the TAP replicator and send to the replica server. CouchDB Storage Structure Data server defines an interface where different storage structure can be plugged-in. Currently it supports both a SQLite DB as well as CouchDB. Here we describe the details of CouchDb, which provides a super high performance storage mechanism underneath the Couchbase technology. Under the CouchDB structure, there will be one file per vBucket. Data are written to this file in an append-only manner, which enables Couchbase to do mostly sequential writes for update, and provide the most optimized access patterns for disk I/O. This unique storage structure attributes to Couchbase’s fast on-disk performance for write-intensive applications. The following diagram illustrate the storage model and how it is modified by 3 batch updates (notice that since updates are asynchronous, it is perform by "Flusher" thread in batches). The Flusher thread works as follows: 1) Pick up all pending write request from the dirty queue and de-duplicate multiple update request to the same document. 2) Sort each request (by key) into corresponding vBucket and open the corresponding file 3) Append the following into the vBucket file (in the following contiguous sequence) All document contents in such write request batch. Each document will be written as [length, crc, content] one after one sequentially. The index that stores the mapping from document id to the document’s position on disk (called the BTree by-id) The index that stores the mapping from update sequence number to the document’s position on disk. (called the BTree by-seq) The by-id index plays an important role for looking up the document by its id. It is organized as a B-Tree where each node contains a key range. To lookup a document by id, we just need to start from the header (which is the end of the file), transfer to the root BTree node of the by-id index, and then further traverse to the leaf BTree node that contains the pointer to the actual document position on disk. During the write, the similar mechanism is used to trace back to the corresponding BTree node that contains the id of the modified documents. Notice that in the append-only model, update is not happening in-place, instead we located the existing location and copy it over by appending. In other words, the modified BTree node will be need to be copied over and modified and finally paste to the end of file, and then its parent need to be modified to point to the new location, which triggers the parents to be copied over and paste to the end of file. Same happens to its parents’ parent and eventually all the way to the root node of the BTree. The disk seek can be at the O(logN) complexity. The by-seq index is used to keep track of the update sequence of lived documents and is used for asynchronous catchup purposes. When a document is created, modified or deleted, a sequence number is added to the by-seq btree and the previous seq node will be deleted. Therefore, for cross-site replication, view index update and compaction, we can quickly locate all the lived documents in the order of their update sequence. When a vBucket replicator asks for the list of update since a particular time, it provides the last sequence number in previous update, the system will then scan through the by-seq BTree node to locate all the document that has sequence number larger than that, which effectively includes all the document that has been modified since the last replication. As time goes by, certain data becomes garbage (see the grey-out region above) and become unreachable in the file. Therefore, we need a garbage collection mechanism to clean up the garbage. To trigger this process, the by-id and by-seq B-Tree node will keep track of the data size of lived documents (those that is not garbage) under its substree. Therefore, by examining the root BTree node, we can determine the size of all lived documents within the vBucket. When the ratio of actual size and vBucket file size fall below a certain threshold, a compaction process will be triggered whose job is to open the vBucket file and copy the survived data to another file. Technically, the compaction process opens the file and read the by-seq BTree at the end of the file. It traces the Btree all the way to the leaf node and copy the corresponding document content to the new file. The compaction process happens while the vBucket is being updated. However, since the file is appended only, new changes are recorded after the BTree root that the compaction has opened, so subsequent data update won’t interfere with the compaction process. When the compaction is completed, the system need to copy over the data that was appended since the beginning of the compaction to the new file. View Index Structure Unlike most indexing structure which provide a pointer from the search attribute back to the document. The CouchDb index (called View Index) is better perceived as a denormalized table with arbitrary keys and values loosely associated to the document. Such denormalized table is defined by a user-provided map() and reduce() function. map = function(doc) { … emit(k1, v1) … emit(k2, v2) … } reduce = function(keys, values, isRereduce) { if (isRereduce) { // Do the re-reduce only on values (keys will be null) } else { // Do the reduce on keys and values } // result must be ready for input values to re-reduce return result } Whenever a document is created, updated, deleted, the corresponding map(doc) function will be invoked (in an asynchronous manner) to generate a set of key/value pairs. Such key/value will be stored in a B-Tree structure. All the key/values pairs of each B-Tree node will be passed into the reduce() function, which compute an aggregated value within that B-Tree node. Re-reduce also happens in non-leaf B-Tree nodes which further aggregate the aggregated value of child B-Tree nodes. The management server maintains the view index and persisted it to a separate file. Create a view index is perform by broadcast the index creation request to all machines in the cluster. The management process of each machine will read its active vBucket file and feed each surviving document to the Map function. The key/value pairs emitted by the Map function will be stored in a separated BTree index file. When writing out the BTree node, the reduce() function will be called with the list of all values in the tree node. Its return result represent a partially reduced value is attached to the BTree node. The view index will be updated incrementally as documents are subsequently getting into the system. Periodically, the management process will open the vBucket file and scan all documents since the last sequence number. For each changed document since the last sync, it invokes the corresponding map function to determine the corresponding key/value into the BTree node. The BTree node will be split if appropriate. Underlying, Couchbase use a back index to keep track of the document with the keys that it previously emitted. Later when the document is deleted, it can look up the back index to determine what those key are and remove them. In case the document is updated, the back index can also be examined; semantically a modification is equivalent to a delete followed by an insert. The following diagram illustrates how the view index file will be incrementally updated via the append-only mechanism. Query Processing Query in Couchbase is made against the view index. A query is composed of the view name, a start key and end key. If the reduce() function isn’t defined, the query result will be the list of values sorted by the keys within the key range. In case the reduce() function is defined, the query result will be a single aggregated value of all keys within the key range. If the view has no reduce() function defined, the query processing proceeds as follows: Client issue a query (with view, start/end key) to the management process of any server (unlike a key based lookup, there is no need to locate a specific server). The management process will broadcast the request to other management process on all servers (include itself) within the cluster. Each management process (after receiving the broadcast request) do a local search for value within the key range by traversing the BTree node of its view file, and start sending back the result (automatically sorted by the key) to the initial server. The initial server will merge the sorted result and stream them back to the client. However, if the view has reduce() function defined, the query processing will involve computing a single aggregated value as follows: Client issue a query (with view, start/end key) to the management process of any server (unlike a key based lookup, there is no need to locate a specific server). The management process will broadcast the request to other management process on all servers (include itself) within the cluster. Each management process do a local reduce for value within the key range by traversing the BTree node of its view file to compute the reduce value of the key range. If the key range span across a BTree node, the pre-computed of the sub-range can be used. This way, the reduce function can reuse a lot of partially reduced values and doesn’t need to recomputed every value of the key range from scratch. The original server will do a final re-reduce() in all the return value from each other servers, and then passed back the final reduced value to the client. To illustrate the re-reduce concept, lets say the query has its key range from A to F. Instead of calling reduce([A,B,C,D,E,F]), the system recognize the BTree node that contains [B,C,D] has been pre-reduced and the result P is stored in the BTree node, so it only need to call reduce(A,P,E,F). Update View Index as vBucket migrates Since the view index is synchronized with the vBuckets in the same server, when the vBucket has migrated to a different server, the view index is no longer correct; those key/value that belong to a migrated vBucket should be discarded and the reduce value cannot be used anymore. To keep track of the vBucket and key in the view index, each bTree node has a 1024-bitmask indicating all the vBuckets that is covered in the subtree (ie: it contains a key emitted from a document belonging to the vBucket). Such bit-mask is maintained whenever the bTree node is updated. At the server-level, a global bitmask is used to indicate all the vBuckets that this server is responsible for. In processing the query of the map-only view, before the key/value pair is returned, an extra check will be perform for each key/value pair to make sure its associated vBucket is what this server is responsible for. When processing the query of a view that has a reduce() function, we cannot use the pre-computed reduce value if the bTree node contains a vBucket that the server is not responsible for. In this case, the bTree node’s bit mask is compared with the global bit mask. In case if they are not aligned, then the reduce value need to be recomputed. Here is an example to illustrate this process Couchbase is one of the popular NOSQL technology built on a solid technology foundation designed for high performance. In this post, we have examined a number of such key features: Load balancing between servers inside a cluster that can grow and shrink according to workload conditions. Data migration can be used to re-achieve workload balance. Asynchronous write provides lowest possible latency to client as it returns once the data is store in memory. Append-only update model pushes most update transaction into sequential disk access, hence provide extremely high throughput for write intensive applications. Automatic compaction ensures the data lay out on disk are kept optimized all the time. Map function can be used to pre-compute view index to enable query access. Summary data can be pre-aggregated using the reduce function. Overall, this cut down the workload of query processing dramatically. For a review on NOSQL architecture in general and some theoretical foundation, I have wrote a NOSQL design pattern blog, as well as some fundamental difference between SQL and NOSQL. For other NOSQL technologies, please read my other blog on MongoDb, Cassandra and HBase, Memcached Special thanks to Damien Katz and Frank Weigel from Couchbase team who provide a lot of implementation details of Couchbase.
July 7, 2012
· 83,132 Views · 5 Likes
article thumbnail
Recommendation Engine Models
In a classical model of recommendation system, there are "users" and "items". User has associated metadata (or content) such as age, gender, race and other demographic information. Items also has its metadata such as text description, price, weight ... etc. On top of that, there are interaction (or transaction) between user and items, such as userA download/purchase movieB, userX give a rating 5 to productY ... etc. Now given all the metadata of user and item, as well as their interaction over time, can we answer the following questions ... What is the probability that userX purchase itemY ? What rating will userX give to itemY ? What is the top k unseen items that should be recommended to userX ? Content-based Approach In this approach, we make use of the metadata to categorize user and item and then match them at the category level. One example is to recommend jobs to candidates, we can do a IR/text search to match the user's resume with the job descriptions. Another example is to recommend an item that is "similar" to the one that the user has purchased. Similarity is measured according to the item's metadata and various distance function can be used. The goal is to find k nearest neighbors of the item we know the user likes. Collaborative Filtering Approach In this approach, we look purely at the interactions between user and item, and use that to perform our recommendation. The interaction data can be represented as a matrix. Notice that each cell represents the interaction between user and item. For example, the cell can contain the rating that user gives to the item (in the case the cell is a numeric value), or the cell can be just a binary value indicating whether the interaction between user and item has happened. (e.g. a "1" if userX has purchased itemY, and "0" otherwise. The matrix is also extremely sparse, meaning that most of the cells are unfilled. We need to be careful about how we treat these unfilled cells, there are 2 common ways ... Treat these unknown cells as "0". Make them equivalent to user giving a rate "0". This may or may not be a good idea depends on your application scenarios. Guess what the missing value should be. For example, to guess what userX will rate itemA given we know his has rate on itemB, we can look at all users (or those who is in the same age group of userX) who has rate both itemA and itemB, then compute an average rating from them. Use the average rating of itemA and itemB to interpolate userX's rating on itemA given his rating on itemB. User-based Collaboration Filter In this model, we do the following Find a group of users that is “similar” to user X Find all movies liked by this group that hasn’t been seen by user X Rank these movies and recommend to user X This introduces the concept of user-to-user similarity, which is basically the similarity between 2 row vectors of the user/item matrix. To compute the K nearest neighbor of a particular users. A naive implementation is to compute the "similarity" for all other users and pick the top K. Different similarity functions can be used. Jaccard distance function is defined as the number of intersections of movies that both users has seen divided by the number of union of movies they both seen. Pearson similarity is first normalizing the user's rating and then compute the cosine distance. There are two problems with this approach Compare userX and userY is expensive as they have millions of attributes Find top k similar users to userX require computing all pairs of userX and userY Location Sensitive Hashing and Minhash To resolve problem 1, we approximate the similarity using a cheap estimation function, called minhash. The idea is to find a hash function h() such that the probability of h(userX) = h(userY) is proportion to the similarity of userX and userY. And if we can find 100 of h() function, we can just count the number of such function where h(userX) = h(userY) to determine how similar userX is to userY. The idea is depicted as follows ... It will be expensive to permute the rows if the number of rows is large. Remember that the purpose of h(c1) is to return row number of the first row that is 1. So we can scan each row of c1 to see if it is 1, if so we apply a function newRowNum = hash(rowNum) to simulate a permutation. Take the minimum of the newRowNum seen so far. As an optimization, instead of doing one column at a time, we can do it a row at the time, the algorithm is as follows To solve problem 2, we need to avoid computing all other users' similarity to userX. The idea is to hash users into buckets such that similar users will be fall into the same bucket. Therefore, instead of computing all users, we only compute the similarity of those users who is in the same bucket of userX. The idea is to horizontally partition the column into b bands, each with r rows. By pick the parameter b and r, we can control the likelihood (function of similarity) that they will fall into the same bucket in at least one band. Item-based Collaboration Filter If we transpose the user/item matrix and do the same thing, we can compute the item to item similarity. In this model, we do the following ... Find the set of movies that user X likes (from interaction data) Find a group of movies that is similar to these set of movies that we know user X likes Rank these movies and recommend to user X It turns out that computing item-based collaboration filter has more benefit than computing user to user similarity for the following reasons ... Number of items typically smaller than number of users While user's taste will change over time and hence the similarity matrix need to be updated more frequent, item to item similarity tends to be more stable and requires less update. Singular Value Decomposition If we look back at the matrix, we can see the matrix multiplication is equivalent to mapping an item from the item space to the user space. In other words, if we view each of the existing item as an axis in the user space (notice, each user is a vector of their rating on existing items), then multiplying a new item with the matrix gives the same vector like the user. So we can then compute a dot product with this projected new item with user to determine its similarity. It turns out that this is equivalent to map the user to the item space and compute a dot product there. In other words, multiply the matrix is equivalent to mapping between item space and user space. Now lets imagine there is a hidden concept space in between. Instead of jumping directly from user space to item space, we can think of jumping from user space to a concept space, and then to the item space. Notice that here we first map the user space to the concept space and also map the item space to the concept space. Then we match both user and item at the concept space. This is a generalization of our recommender. We can use SVD to factor the matrix into 2 parts. Let P be the m by n matrix (m rows and n columns). P = UDV where U is an m by m matrix, each column represents the eigenvectors of P*transpose(P). And V is an n by n matrix with each row represents the eigenvector of transpose(P)*P. D is a diagonal matrix containing eigenvalues of P*transpose(P), or transpose(P)*P. In other words, we can decompose P into U*squareroot(D) and squareroot(D)*V. Notice that D can be thought as the strength of each "concept" in the concept space. And the value is order in terms of their magnitude in decreasing order. If we remove some of the weakest concept by making them zero, we reduce the number of non-zero elements in D, which effective generalize the concept space (make them focus in the important concepts). Calculate SVD decomposition for matrix with large dimensions is expensive. Fortunately, if our goal is to compute an SVD approximation (with k diagonal non-zero value), we can use the random projection mechanism as describer here. Association Rule Based In this model, we use the market/basket association rule algorithm to discover rule like ... {item1, item2} => {item3, item4, item5} We represent each user as a basket and each viewing as an item (notice that we ignore the rating and use a binary value). After that we use association rule mining algorithm to detect frequent item set and the association rules. Then for each user, we match the user's previous viewing items to the set of rules to determine what other movies should we recommend. Evaluate the recommender After we have a recommender, how do we evaluate the performance of it ? The basic idea is to use separate the data into the training set and the test set. For the test set, we remove certain user-to-movies interaction (change certain cells from 1 to 0) and pretending the user hasn't seen the item. Then we use the training set to train a recommender and then fit the test set (with removed interaction) to the recommender. The performance is measured by how much overlap between the recommended items with the one that we have removed. In other words, a good recommender should be able to recover the set of items that we have removed from the test set. Leverage tagging information on items In some cases, items has explicit tags associated with them (we can considered the tags is a user-annotated concept space added to the items). Consider each item is described with a vector of tags. Now user can also be auto-tagged based on the items they have interacted. For example, if userX purchase itemY which is tagged with Z1, and Z2. Then user will increase her tag Z1 and Z2 in her existing tag vector. We can use a time decay mechanism to update the user's tag vector as follows ... current_user_tag = alpha * item_tag + (1 - alpha) * prev_user_tag To recommend an item to the user, we simply need to calculate the top k items by computing the dot product (ie: cosine distance) of the user tag vector and the item tag vector. Source: http://horicky.blogspot.com/2011/09/recommendation-engine.html
November 2, 2011
· 25,640 Views · 2 Likes
article thumbnail
Map Reduce and Stream Processing
Hadoop Map/Reduce model is very good in processing large amount of data in parallel. It provides a general partitioning mechanism (based on the key of the data) to distribute aggregation workload across different machines. Basically, map/reduce algorithm design is all about how to select the right key for the record at different stage of processing. However, "time dimension" has a very different characteristic compared to other dimensional attributes of data, especially when real-time data processing is concerned. It presents a different set of challenges to the batch oriented, Map/Reduce model. Real-time processing demands a very low latency of response, which means there isn't too much data accumulated at the "time" dimension for processing. Data collected from multiple sources may not have all arrived at the point of aggregation. In the standard model of Map/Reduce, the reduce phase cannot start until the map phase is completed. And all the intermediate data is persisted in the disk before download to the reducer. All these added to significant latency of the processing. Here is a more detail description of this high latency characteristic of Hadoop. Although Hadoop Map/Reduce is designed for batch-oriented work load, certain application, such as fraud detection, ad display, network monitoring requires real-time response for processing large amount of data, have started to looked at various way of tweaking Hadoop to fit in the more real-time processing environment. Here I try to look at some technique to perform low-latency parallel processing based on the Map/Reduce model. General stream processing model In this model, data are produced at various OLTP system, which update the transaction data store and also asynchronously send additional data for analytic processing. The analytic processing will write the output to a decision model, which will feed back information to the OLTP system for real-time decision making. Notice the "asynchronous nature" of the analytic processing which is decoupled from the OLTP system, this way the OLTP system won't be slow down waiting for the completion of the analytic processing. Nevetheless, we still need to perform the analytic processing ASAP, otherwise the decision model will not be very useful if it doesn't reflect the current picture of the world. What latency is tolerable is application specific. Micro-batch in Map/Reduce One approach is to cut the data into small batches based on time window (e.g. every hour) and submit the data collected in each batch to the Map Reduce job. Staging mechanism is needed such that the OLTP application can continue independent of the analytic processing. A job scheduler is used to regulate the producer and consumer so each of them can proceed independently. Continuous Map/Reduce Here lets imagine some possible modification of the Map/Reduce execution model to cater for real-time stream processing. I am not trying to worry about the backward compatibility of Hadoop which is the approach that Hadoop online prototype (HOP) is taking. Long running The first modification is to make the mapper and reducer long-running. Therefore, we cannot wait for the end of the map phase before starting the reduce phase as the map phase never ends. This implies the mapper push the data to the reducer once it complete its processing and let the reducer to sort the data. A downside of this approach is that it offers no opportunity to run the combine() function on the map side to reduce the bandwidth utilization. It also shift more workload to the reducer which now needs to do the sorting. Notice there is a tradeoff between latency and optimization. Optimization requires more data to be accumulated at the source (ie: the Mapper) so local consolidation (ie: combine) can be performed. Unfortunately, low latency requires the data to be sent ASAP so not much accumulation can be done. HOP suggest an adaptive flow control mechanism such that data is pushed out to reducer ASAP until the reducer is overloaded and push back (using some sort of flow control protocol). Then the mapper will buffer the processed message and perform combine() before it send to the reducer. This approach automatically shift back and forth the aggregation workload between the reducer and the mapper. Time Window: Slice and Range This is a "time slice" concept and a "time range" concept. "Slice" defines a time window where result is accumulated before the reduce processing is executed. This is also the minimum amount of data that the mapper should accumulate before sending to the reducer. "Range" defines the time window where results are aggregated. It can be a landmark window where it has a well-defined starting point, or a jumping window (consider a moving landmark scenario). It can also be a sliding window where is a fixed size window from the current time is aggregated. After receiving a specific time slice from every mapper, the reducer can start the aggregation processing and combine the result with the previous aggregation result. Slice can be dynamically adjusted based on the amount of data sent from the mapper. Incremental processing Notice that the reducer need to compute the aggregated slice value after receive all records of the same slice from all mappers. After that it calls the user-defined merge() function to merge the slice value with the range value. In case the range need to be refreshed (e.g. reaching a jumping window boundary), the init() functin will be called to get a refreshed range value. If the range value need to be updated (when certain slice value falls outside a sliding range), the unmerge() function will be invoked. Here is an example of how we keep tracked of the average hit rate (ie: total hits per hour) within a 24 hour sliding window with update happens per hour (ie: an one-hour slice). # Call at each hit record map(k1, hitRecord) { site = hitRecord.site # lookup the slice of the particular key slice = lookupSlice(site) if (slice.time - now > 60.minutes) { # Notify reducer whole slice of site is sent advance(site, slice) slice = lookupSlice(site) } emitIntermediate(site, slice, 1) } combine(site, slice, countList) { hitCount = 0 for count in countList { hitCount += count } # Send the message to the downstream node emitIntermediate(site, slice, hitCount) } # Called when reducer receive full slice from all mappers reduce(site, slice, countList) { hitCount = 0 for count in countList { hitCount += count } sv = SliceValue.new sv.hitCount = hitCount return sv } # Called at each jumping window boundary init(slice) { rangeValue = RangeValue.new rangeValue.hitCount = 0 return rangeValue } # Called after each reduce() merge(rangeValue, slice, sliceValue) { rangeValue.hitCount += sliceValue.hitCount } # Called when a slice fall out the sliding window unmerge(rangeValue, slice, sliceValue) { rangeValue.hitCount -= sliceValue.hitCount }
November 23, 2010
· 16,350 Views · 1 Like
article thumbnail
Scalable System Design Patterns
Looking back after 2.5 years since my previous post on scalable system design techniques, I've observed an emergence of a set of commonly used design patterns. Here is my attempt to capture and share them. Load Balancer In this model, there is a dispatcher that determines which worker instance will handle the request based on different policies. The application should best be "stateless" so any worker instance can handle the request. This pattern is deployed in almost every medium to large web site setup. Scatter and Gather In this model, the dispatcher multicast the request to all workers of the pool. Each worker will compute a local result and send it back to the dispatcher, who will consolidate them into a single response and then send back to the client. This pattern is used in Search engines like Yahoo, Google to handle user's keyword search request ... etc. Result Cache In this model, the dispatcher will first lookup if the request has been made before and try to find the previous result to return, in order to save the actual execution. This pattern is commonly used in large enterprise application. Memcached is a very commonly deployed cache server. Shared Space This model also known as "Blackboard"; all workers monitors information from the shared space and contributes partial knowledge back to the blackboard. The information is continuously enriched until a solution is reached. This pattern is used in JavaSpace and also commercial product GigaSpace. Pipe and Filter This model is also known as "Data Flow Programming"; all workers connected by pipes where data is flow across. This pattern is a very common EAI pattern. Map Reduce The model is targeting batch jobs where disk I/O is the major bottleneck. It use a distributed file system so that disk I/O can be done in parallel. This pattern is used in many of Google's internal application, as well as implemented in open source Hadoop parallel processing framework. I also find this pattern can be used in many many application design scenarios. Bulk Synchronous Parellel This model is based on lock-step execution across all workers, coordinated by a master. Each worker repeat the following steps until the exit condition is reached, when there is no more active workers. Each worker read data from input queue Each worker perform local processing based on the read data Each worker push local result along its direct connection This pattern has been used in Google's Pregel graph processing model as well as the Apache Hama project. Execution Orchestrator This model is based on an intelligent scheduler / orchestrator to schedule ready-to-run tasks (based on a dependency graph) across a clusters of dumb workers. This pattern is used in Microsoft's Dryad project Although I tried to cover the whole set of commonly used design pattern for building large scale system, I am sure I have missed some other important ones. Please drop me a comment and feedback. Also, there is a whole set of scalability patterns around data tier that I haven't covered here. This include some very basic patterns underlying NOSQL. And it worths to take a deep look at some leading implementations.
October 18, 2010
· 45,585 Views · 5 Likes
article thumbnail
Exploring Erlang with Map/Reduce
Under the category of "Concurrent Oriented Programming", Erlang has got some good attention recently due to some declared success from Facebook engineers of using Erlang in large scale applications. Tempted to figure out the underlying ingredients of Erlang, I decided to spent some time to learn the language. Multi-threading Problem Multiple threads of execution is a common programming model in modern languages because it enable a more efficient use of computing resources provided by multi-core and multi-machine architecture. One of question to be answered though, is how these parallel threads of execution interact and work co-operative to solve the application problem. There are basically two models for communication between concurrent executions. One is based on a "Shared Memory" model which one thread of execution write the information into a shared place where other threads will read from. Java's thread model is based on such a "shared memory" semantics. The typical problem of this model is that concurrent update requires very sophisticated protection scheme, otherwise uncoordinated access can result in inconsistent data. Unfortunately, this protection scheme is very hard to analyze once there are multiple threads start to interact in combinatorial explosion number of different ways. Hard to debug deadlock problem are frequently pop up. To reduce the complexity, using a coarse grain locking model is usually recommended but this may reduce the concurrency. Erlang has picked the other model based on "message passing". In this model, any information that needs to be shared will be "copied" into a message and send to other executions. In this model, each thread of execution has its state "completely local" (not viewable by other thread of executions). Their local state is updated when they learn what is going on in other threads by receiving their messages. This model mirrors how people in real life interact with each other. Erlang Sequential Processing Coming from an object oriented imperative programming background, there are a couple of things I need to unlearn/learn in Erlang. Erlang is a functional programming language and have no OO concepts. Erlang code is structured as "function" at a basic unit, grouped under a "module". Each "function" takes a number of inputs parameters and produce an output value. Like many functional programming language, Erlang encourage the use of "pure function" which is "side-effect-free" and "deterministic". "Side-effect-free" means there is no state changes within the execution of the function. "Deterministic" means the same output will always be produced from the same input. Erlang has a very different concept in variable assignment in that all variables in Erlang is immutable. In other words, every variable can only be assigned once and from then onwards can never be changed. So I cannot do X = X + 1, and I have to use a new variable and assigned it with the changed value, e.g. Y = X + 1. This "immutability" characteristic simplify debugging a lot because I don't need to worry about how the value of X is changed at different point of execution (it simply won't change). Another uncommon thing about Erlang is that there is no "while loop" construct in the language. To achieve the looping effect, you need to code the function in a recursive way, basically putting a terminal clause to check for the exit condition, as well as carefully structure the logic in a tail recursion fashion. Otherwise, you may run out of memory in case the stack grow too much. Tail recursion function means the function either returns a value (but not an expression) or a recursive function call. Erlang is smart enough to do tail recursion across multiple functions, such as if funcA calls funcB, which calls funcC, which call funcA. Tail recursion is especially important in writing server daemon which typically make a self recursive call after process a request. Erlang Parallel Processing The execution thread in Erlang is called a "Process". Don't be confused with OS-level processes, Erlang process is extremely light-weight, much lighter than Java threads. A process is created by a spawn(Node, Module, Function, Arguments) function call and it terminates when that function is return. Erlang processes communicate with each other by passing messages. Process ids are used by the sender to specify the recipient addresses. The send call happens asynchronously and returns immediately. The receiving process will make a synchronous receive call and specify a number of matching patterns. Arriving messages that match the pattern will be delivered to the receiving process, otherwise it will stay in the queue forever. Therefore, it is good practices to have a match all pattern to clean up garbage message. The receive call also accepts a timeout parameter so that it will return if no matched messages happen within the timeout period. Error handling in Erlang is also quite different from other programming languages. Although Erlang provides a try/catch model, it is not the preferred approach. Instead of catching the error and handle it within the local process, the process should simply die and let another process to take care of what should be done after its crash. Erlang have the concept of having processes "linked" to each other and monitor the life status among themselves. In a default setting, a dying process will propagate an exit signal to all the processes it links to (links are bi-directional). So there is a chaining effect that when one process die, the whole chain of processes will die. However, a process can redefine its behavior after receiving the exit signal. Instead of "dying", a process can choose to handle the error (perhaps by restarting the dead process). Other Erlang Features Pattern matching is a common programming construct in many places of Erlang, namely "Function calls", "Variable assignment", "Case statements" and "Receive messages". It takes some time to get used to this style. After that I feel this construct to be very powerful. Another cool feature that Erlang provides is the code hot swap. By specifying the module name when making the function call, a running Erlang process can execute the latest code without restarting itself. This is a powerful features for code evolution because you don't need to shutdown the VM when deploying new code. Since the function itself can be passed as a message to a remote process, execute code remotely is extremely easy in Erlang. The problem of installation, deployment is pretty much non-existent in Erlang Map/Reduce using Erlang After learning the basic concepts, my next step is to search for a problem and get some hands on with the language. Based on a work-partition, aggregation, parallel processing model, Map/Reduce seems to have the characteristic model that aligns very nicely into Erlang's parallel processing model. So I pick my project to implement a simple Map/Reduce framework in Erlang. Here is the Erlang implementation ... First of all, I need some Helper functions -module(mapreduce). -export([reduce_task/2, map_task/2, test_map_reduce/0]). %%% Execute the function N times, %%% and put the result into a list repeat_exec(N,Func) -> lists:map(Func, lists:seq(0, N-1)). %%% Identify the reducer process by %%% using the hashcode of the key find_reducer(Processes, Key) -> Index = erlang:phash(Key, length(Processes)), lists:nth(Index, Processes). %%% Identify the mapper process by random find_mapper(Processes) -> case random:uniform(length(Processes)) of 0 -> find_mapper(Processes); N -> lists:nth(N, Processes) end. %%% Collect result synchronously from %%% a reducer process collect(Reduce_proc) -> Reduce_proc ! {collect, self()}, receive {result, Result} -> Result end. Main function The MapReduce() function is the entry point of the system. It first starts all the R number of Reducer processes It starts all the M number of Mapper processes, passing them the R reducer processes ids For each line of input data, it randomly pick one of the M mapper processes and send the line to it Wait until the completion has finished Collect result from the R reducer processes Return the collected result The corresponding Erlang code is as follows ... %%% The entry point of the map/reduce framework map_reduce(M, R, Map_func, Reduce_func, Acc0, List) -> %% Start all the reducer processes Reduce_processes = repeat_exec(R, fun(_) -> spawn(mapreduce, reduce_task, [Acc0, Reduce_func]) end), io:format("Reduce processes ~w are started~n", [Reduce_processes]), %% Start all mapper processes Map_processes = repeat_exec(M, fun(_) -> spawn(mapreduce, map_task, [Reduce_processes, Map_func]) end), io:format("Map processes ~w are started~n", [Map_processes]), %% Send the data to the mapper processes Extract_func = fun(N) -> Extracted_line = lists:nth(N+1, List), Map_proc = find_mapper(Map_processes), io:format("Send ~w to map process ~w~n", [Extracted_line, Map_proc]), Map_proc ! {map, Extracted_line} end, repeat_exec(length(List), Extract_func), timer:sleep(2000), %% Collect the result from all reducer processes io:format("Collect all data from reduce processes~n"), All_results = repeat_exec(length(Reduce_processes), fun(N) -> collect(lists:nth(N+1, Reduce_processes)) end), lists:flatten(All_results). Map Process The Map processes, once started, will perform the following ... Receive the input line Execute the User provided Map function to turn into a list of key, value pairs For each key and value, select a reducer process and send the key, value to it The corresponding Erlang code will be as follows .. %%% The mapper process map_task(Reduce_processes, MapFun) -> receive {map, Data} -> IntermediateResults = MapFun(Data), io:format("Map function produce: ~w~n", [IntermediateResults ]), lists:foreach( fun({K, V}) -> Reducer_proc = find_reducer(Reduce_processes, K), Reducer_proc ! {reduce, {K, V} end, IntermediateResults), map_task(Reduce_processes, MapFun) end. Reduce Process On the other hand, the reducer processes will execute as follows ... Receive the key, value from the Mapper process Get the current accumulated value by the key. If no accumulated value is found, use the initial accumulated value Invoke the user provided reduce function to calculate the new accumulated value Store the new accumulated value under the key The corresponding Erlang code will be as follows .. %%% The reducer process reduce_task(Acc0, ReduceFun) -> receive {reduce, {K, V} -> Acc = case get(K) of undefined -> Acc0; Current_acc -> Current_acc end, put(K, ReduceFun(V, Acc)), reduce_task(Acc0, ReduceFun); {collect, PPid} -> PPid ! {result, get()}, reduce_task(Acc0, ReduceFun) end. Word Count Example To test the Map/Reduce framework using a word count example .. %%% Testing of Map reduce using word count test_map_reduce() -> M_func = fun(Line) -> lists:map( fun(Word) -> {Word, 1} end, Line) end, R_func = fun(V1, Acc) -> Acc + V1 end, map_reduce(3, 5, M_func, R_func, 0, [[this, is, a, boy], [this, is, a, girl], [this, is, lovely, boy]]). This is the result when execute the test program Erlang (BEAM) emulator version 5.6.1 [smp:2] [async-threads:0] Eshell V5.6.1 (abort with ^G) 1> c (mapreduce). {ok,mapreduce} 2> 2> mapreduce:test_map_reduce(). Reduce processes [<0.37.0>,<0.38.0>,<0.39.0>,<0.40.0>,<0.41.0>] are started Map processes [<0.42.0>,<0.43.0>,<0.44.0>] are started Send [this,is,a,boy] to map process <0.42.0> Send [this,is,a,girl] to map process <0.43.0> Map function produce: [{this,1},{is,1},{a,1},{boy,1}] Send [this,is,lovely,boy] to map process <0.44.0> Map function produce: [{this,1},{is,1},{a,1},{girl,1}] Map function produce: [{this,1},{is,1},{lovely,1},{boy,1}] Collect all data from reduce processes [{is,3},{this,3},{boy,2},{girl,1},{a,2},{lovely,1}] 3> Summary From this exercise of implementing a simple Map/Reduce model using Erlang, I found that Erlang is very powerful in developing distributed systems.
June 15, 2008
· 18,220 Views
article thumbnail
Common REST Design Pattern
Based on the same architectural pattern of the web, "REST" has a growing dominance of the SOA (Service Oriented Architecture) implementation these days. In this article, we will discuss some basic design principles of REST. SOAP : The Remote Procedure Call Model Before the REST become a dominance, most of SOA architecture are built around WS* stack, which is fundamentally a RPC (Remote Procedure Call) model. Under this model, "Service" is structured as some "Procedure" exposed by the system. For example, WSDL is used to define the procedure call syntax (such as the procedure name, the parameter and their structure). SOAP is used to define how to encode the procedure call into an XML string. And there are other WS* standards define higher level protocols such as how to pass security credentials around, how to do transactional procedure call, how to discover the service location ... etc. Unfortunately, the WS* stack are getting so complicated that it takes a steep learning curve before it can be used. On the other hand, it is not achieving its original goal of inter-operability (probably deal to different interpretation of what the spec says). In the last 2 years, WS* technology development has been slowed down and the momentum has been shifted to another model; REST. REST: The Resource Oriented Model REST (REpresentation State Transfer) is introduced by Roy Fielding when he captured the basic architectural pattern that make the web so successful. Observing how the web pages are organized and how they are linked to each other, REST is modeled around a large number of "Resources" which "link" among each other. As a significant difference with WS*, REST raises the importance of "Resources" as well as its "Linkage", on the other hand, it push down the importance of "Procedures". Under the WS* model, "Service" in the SOA is organized as large number of "Resources". Each resource will have a URI that make it globally identifiable. A resource is represented by some format of "Representation" which is typically extracted by an idempotent HTTP GET. The representation may embed other URI which refers to other resources. This emulates an HTML link between web pages and provide a powerful way for the client to discover other services by traversing its links. It also make building SOA search engine possible. On the other hand, REST down play the "Procedure" aspect and define a small number of "action" based on existing HTTP Methods. As we discussed above, HTTP GET is used to get a representation of the resource. To modify a resource, REST use HTTP PUT with the new representation embedded inside the HTTP Body. To delete a resource, REST use HTTP DELETE. To get metadata of a resource, REST use HTTP HEAD. Notice that in all these cases, the HTTP Body doesn't carry any information about the "Procedure". This is quite different from WS* SOAP where the request is always made using HTTP POST. At the first glance, it seems REST is quite limiting in terms of the number of procedures that it can supported. It turns out this is not the case, REST allows any "Procedure" (which has a side effect) to use HTTP POST. Effectively, REST categorize the operations by its nature and associate well-defined semantics with these categories (ie: GET for read-only, PUT for update, DELETE for remove, all above are idempotent) while provide an extension mechanism for application-specific operations (ie: POST for application procedures which may be non-idempotent). URI Naming Convention Since resource is usually mapped to some state in the system, analyzing its lifecycle is an important step when designing how a resource is created and how an URI should be structured. Typically there are some eternal, singleton "Factory Resource" which create other resources. Factory resource typically represents the "type" of resources. Factory resource usually have a static, well-known URI, which is suffixed by a plural form of the resource type. Some examples are ... http://xyz.com/books http://xyz.com/users "Resource Instance", which are created by the "Factory Resource" usually represents an instance of that resource type. "Resource instances" typically have a limited life span. Their URI typically contains some unique identifier so that the corresponding instance of the resource can be located. Some examples are ... http://xyz.com/books/4545 http://xyz.com/users/123 "Dependent Resource" are typically created and owned by an existing resource during part of its life cycle. Therefore "dependent resource" has an implicit life-cycle dependency on its owning parent. When a parent resource is deleted, all the dependent resource it owns will be deleted automatically. Dependent resource use an URI which has prefix of its parent resource URI. Some examples are ... http://xyz.com/books/4545/tableofcontent http://xyz.com/users/123/shopping_cart Creating Resource To create a resource instance of a particular resource type, make an HTTP POST to the Factory Resource URI. If the creation is successful, the response will contain a URI of the resource that has been created. To create a book ... POST /books HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 Content-Length: nnn Ricky Ho HTTP/1.1 201 Created Content-Type: application/xml; charset=utf-8 Location: /books/4545 http://xyz.com/books/4545 To create a dependent resource, make an HTTP POST to its owning resource's URI To upload the content of a book (using HTTP POST) ... POST /books/4545 HTTP/1.1 Host: example.org Content-Type: application/pdf Content-Length: nnnn {pdf data} HTTP/1.1 201 Created Content-Type: application/pdf Location: /books/4545/content http://xyz.com/books/4545/tableofcontent HTTP POST is typically used to create a resource when its URI is unknown to the client before its creation. However, if the URI is known to the client, then an idempotent HTTP PUT should be used with the URI of the resource to be created. To upload the content of a book (using HTTP PUT) ... HTTP/1.1 201 Created Content-Type: application/pdf Location: /books/4545/content http://xyz.com/books/4545/tableofcontent HTTP/1.1 200 OK Finding Resources Make an HTTP GET to the factory resource URI, criteria pass in as parameters. (Note that it is up to the factory resource to interpret the query parameter). To search for books with a certain author ... GET /books?author=Ricky HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 HTTP/1.1 200 OK Content-Type: application/xml; charset=utf-8 Content-Length: nnn http://xyz.com/books/4545 Ricky http://xyz.com/books/4546 Ricky Another school of thoughts is to embed the criteria in the URI path, such as ... http://xyz.com/books/author/Ricky I personally prefers the query parameters mechanism because it doesn't imply any order of search criteria. Lookup a particular resource Make an HTTP GET to the resource object URI Lookup a particular book... GET /books/4545 HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 HTTP/1.1 200 OK Content-Type: application/xml; charset=utf-8 Content-Length: nnn Ricky Ho In case the resource have multiple representation format. The client should specify within the HTTP header "Accept" of its request what format she is expecting. Lookup a dependent resource Make an HTTP GET to the dependent resource object URI Download the table of content of a particular book... GET /books/4545/tableofcontent HTTP/1.1 Host: xyz.com Content-Type: application/pdf HTTP/1.1 200 OK Content-Type: application/pdf Content-Length: nnn {pdf data} Modify a resource Make an HTTP PUT to the resource object URI, pass in the new object representation in the HTTP body Change the book title ... PUT /books/4545 HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 Content-Length: nnn Ricky Ho HTTP/1.1 200 OK Delete a resource Make an HTTP DELETE to the resource object URI Delete a book ... DELETE /books/4545 HTTP/1.1 Host: xyz.com HTTP/1.1 200 OK Resource Reference In some cases, we do not want to create a new resource, but we want to add a "reference" to an existing resource. e.g. consider a book is added into a shopping cart, which is another resource. Add a book into the shopping cart ... POST /users/123/shopping_cart HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 Content-Length: nnn http://xyz.com/books/4545 HTTP/1.1 200 OK Show all items of the shopping cart ... GET /users/123/shopping_cart HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 HTTP/1.1 200 OK Content-Type: application/xml; charset=utf-8 Content-Length: nnn http://xyz.com/books/4545 ... Note that the shopping cart resource contains "resource reference" which acts as links to other resources (which is the books). Such linkages create a resource web so that client can discovery and navigate across different resources. Remove a book from the shopping cart ... POST /users/123/shopping_cart HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 Content-Length: nnn http://xyz.com/books/4545 HTTP/1.1 200 OK Note that we are using HTTP POST rather than HTTP DELETE to remove a resource reference. This is because we are remove a link but not the actual resource itself. In this case, the book still exist after it is taken out from the shopping cart. Checkout the shopping cart ... POST /orders HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 Content-Length: nnn http://xyz.com/users/123/shopping_cart HTTP/1.1 201 Created Content-Type: application/xml; charset=utf-8 Location: /orders/2008/04/10/1001 http://xyz.com/orders/2008/04/10/1001 Note that here the checkout is implemented by creating another resource "Order" which is used to keep track of the fulfillment of the purchase. Transaction Resource One of the common criticism of REST is because it is so tied in to HTTP (which doesn't support a client callback mechanism), doing asynchronous service or notification on REST is hard. So how do we implement long running transactions (which typically require asynchronicity and callback support) in REST ? The basic idea is to immediately create a "Transaction Resource" to return back to the client. While the actual processing happens asynchronously in the background, the client at any time, can poll the "Transaction Resource" for the latest processing status. Lets look at an example to request for printing a book, which may take a long time to complete Print a book POST /books/123 HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 Content-Length: nnn ?xml version="1.0" ?> http://xyz.com/printers/abc HTTP/1.1 200 OK Content-Type: application/xml; charset=utf-8 Location: /transactions/1234 http://xyz.com/transactions/1234 Note that a response is created immediately which contains the URI of a transaction resource, even before the print job is started. Client can poll the transaction resource to obtain the latest status of the print job. Check the status of the print Job ... GET /transactions/1234 HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 HTTP/1.1 200 OK Content-Type: application/xml; charset=utf-8 Content-Length: nnn PrintJob In Progress It is also possible to cancel the transaction if it is not already completed. Cancel the print job POST /transactions/1234 HTTP/1.1 Host: xyz.com Content-Type: application/xml; charset=utf-8 Content-Length: nnn HTTP/1.1 200 OK Conclusion The Resource Oriented Model that REST advocates provides a more natural fit for our service web. Therefore, I suggest that SOA implementation should take the REST model as a default approach.
June 4, 2008
· 132,429 Views

Comments

Escape Local Optimum Trap

Dec 16, 2013 · Allen Coin

No, I haven't.

Thanks for the pointer.

10 ways to protect your company from employee transition risks

Dec 16, 2013 · Tony Thomas

No, I haven't.

Thanks for the pointer.

10 ways to protect your company from employee transition risks

Dec 16, 2013 · Tony Thomas

No, I haven't.

Thanks for the pointer.

Escape Local Optimum Trap

Dec 16, 2013 · Allen Coin

No, I haven't.

Thanks for the pointer.

Introduction to n-Tier Architectures and .Net 2.0

Nov 07, 2009 · Payton Byrd

Thanks for mrflip's feedback.

In fact, take a look at the latest post here at

http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html

Rgds, Ricky

Quick Tip: JSTL to find locale

Nov 23, 2008 · Z T

Thanks Peter for your feedback.

Yes, parallelising a problem that is inherantly parallel is trivial. These kind of problem is called "embrassingly parallel" and guess what, many of the real world problem is like that. Think about Google's web crawler, indexing ... etc.

But there are other problem which is less trivial. Usually they have more steps that are sequentially dependent on each other. These kind of problem needs more analysis but usually some degree of parallelismcan be achieved.

Part of my day job is to explore potential restructuring of sequential algorithm so that it can be execute faster. I am quite surpirse that many algorithms that seems to be sequential at the beginning can in fact be parallelized. Even for calculating a transitive closure of a graph, I can imagine that we can start multiple threads to walk different parts of the graph at the branch point.

I will definitely share more findings as I learn more.

Eclipse: New IDE Eases Open Source SOA

Jun 09, 2008 · Alex Johnson

To read the query template

GET xyz.com/books/queries/q1

To execute the query

GET xyz.com/books/queries/q1/result

If you use XPath, how would it be much different from ...

GET query=attr1/="vanilla"&attr2/subattrB/="strange"

Yours will just be http://xyz.com/books/query?xpath={my_xpath_query_here}

Rgds,

Common REST Design Pattern

Jun 09, 2008 · Ricky Ho

To read the query template

GET xyz.com/books/queries/q1

To execute the query

GET xyz.com/books/queries/q1/result

If you use XPath, how would it be much different from ...

GET query=attr1/="vanilla"&attr2/subattrB/="strange"

Yours will just be http://xyz.com/books/query?xpath={my_xpath_query_here}

Rgds,

Common REST Design Pattern

Jun 06, 2008 · Ricky Ho

I am not seeing the issue of my query structure. Can you give an example to highlight the issue ?

Eclipse: New IDE Eases Open Source SOA

Jun 06, 2008 · Alex Johnson

I am not seeing the issue of my query structure. Can you give an example to highlight the issue ?

Common REST Design Pattern

Jun 06, 2008 · Ricky Ho

I am not seeing the issue of my query structure. Can you give an example to highlight the issue ?

Eclipse: New IDE Eases Open Source SOA

Jun 06, 2008 · Alex Johnson

I am not seeing the issue of my query structure. Can you give an example to highlight the issue ?

Common REST Design Pattern

Jun 06, 2008 · Ricky Ho

I am not seeing the issue of my query structure. Can you give an example to highlight the issue ?

Eclipse: New IDE Eases Open Source SOA

Jun 06, 2008 · Alex Johnson

I am not seeing the issue of my query structure. Can you give an example to highlight the issue ?

Common REST Design Pattern

Jun 06, 2008 · Ricky Ho

Regarding PUT vs POST ...

a) PUT is idempotent and POST is not.

b) PUT is more restrict, you ask the resource to update itself (the request body should be a new representation of the resource)

c) POST is for general purpose procedure call. You ask the resource to do anything by passing in any XML (which should have an associated XSD).

If you want to update an attribute of the resource, you should definitely use PUT

PUT http://xyz.com/myresource/attributeA

a_new_representation

If you want to create a new resource, you have two choices.

a) In case the client knows the new resources URL, a PUT is always preferred. You can considered this as an update to a specifically named attribute.

b) If the client doesn't know the URL, then a POST will be used to the Factory Resource (or the owning resource).

Rgds, Ricky

Eclipse: New IDE Eases Open Source SOA

Jun 06, 2008 · Alex Johnson

Regarding PUT vs POST ...

a) PUT is idempotent and POST is not.

b) PUT is more restrict, you ask the resource to update itself (the request body should be a new representation of the resource)

c) POST is for general purpose procedure call. You ask the resource to do anything by passing in any XML (which should have an associated XSD).

If you want to update an attribute of the resource, you should definitely use PUT

PUT http://xyz.com/myresource/attributeA

a_new_representation

If you want to create a new resource, you have two choices.

a) In case the client knows the new resources URL, a PUT is always preferred. You can considered this as an update to a specifically named attribute.

b) If the client doesn't know the URL, then a POST will be used to the Factory Resource (or the owning resource).

Rgds, Ricky

Common REST Design Pattern

Jun 06, 2008 · Ricky Ho

This is one major difference between SOAP and REST.

... SOAP Style has few URL endpoints, but each endpoint will have many operations. REST Style has many endpoints, each endpoint has few "standard" operations defined by HTTP Verbs.

In SOAP, GetFoo() and GetBar() need to use POST even though they are supposed to be idempotent, the response cannot be cached also

In REST, there will be two seperate URLs. http://xyz.com/object/foo and http://xyz.com/object/bar

In my book example, if you post "blah, blah, blah" ...

1) to http://xyz.com/books/123/table_of_content, then it means table of content of book123

2) to http://xyz.com/books/123/comments, then it means a new comment to book123

3) to http://xyz.com/books/123/chapters, then it means a new chapter. (You can also use HTTP PUT to chapters/5)

It is true that some scenarios, such as a money transfer operation from accountA to accountB maybe better represented in RPC style. First of all, these scenarios are rarely occur. Even when they occur, you can use a REST style to represent it ...

HTTP POST http://xyz.com/fund_transfer_operation

<operation>

<from>http://bank1.com/account/1</from>

<to>http://bank2.com/account/3</to>

<amount>400</amount>

</operation>

You also have an XML Schema attached to the payload. Now tell me what extra stuff that WSDL gives you that XML schema is missing.
Eclipse: New IDE Eases Open Source SOA

Jun 06, 2008 · Alex Johnson

This is one major difference between SOAP and REST.

... SOAP Style has few URL endpoints, but each endpoint will have many operations. REST Style has many endpoints, each endpoint has few "standard" operations defined by HTTP Verbs.

In SOAP, GetFoo() and GetBar() need to use POST even though they are supposed to be idempotent, the response cannot be cached also

In REST, there will be two seperate URLs. http://xyz.com/object/foo and http://xyz.com/object/bar

In my book example, if you post "blah, blah, blah" ...

1) to http://xyz.com/books/123/table_of_content, then it means table of content of book123

2) to http://xyz.com/books/123/comments, then it means a new comment to book123

3) to http://xyz.com/books/123/chapters, then it means a new chapter. (You can also use HTTP PUT to chapters/5)

It is true that some scenarios, such as a money transfer operation from accountA to accountB maybe better represented in RPC style. First of all, these scenarios are rarely occur. Even when they occur, you can use a REST style to represent it ...

HTTP POST http://xyz.com/fund_transfer_operation

<operation>

<from>http://bank1.com/account/1</from>

<to>http://bank2.com/account/3</to>

<amount>400</amount>

</operation>

You also have an XML Schema attached to the payload. Now tell me what extra stuff that WSDL gives you that XML schema is missing.
Common REST Design Pattern

Jun 06, 2008 · Ricky Ho

Thanks for using concrete examples so we can do a technical comparison ...

This is one major difference between SOAP and REST.

1) SOAP Style has few URL endpoints, but each endpoint will have many operations. REST Style has many endpoints, each endpoint has few "standard" operations defined by HTTP Verbs.

In SOAP, GetFoo() and GetBar() need to use POST even though they are supposed to be idempotent, the response cannot be cached also

In REST, there will be two seperate URLs. http://xyz.com/object/foo and http://xyz.com/object/bar

In my book example, if you post "blah, blah, blah" ...

1) to http://xyz.com/books/123/table_of_content, then it means table of content of book123

2) to http://xyz.com/books/123/comments, then it means a new comment to book123

3) to http://xyz.com/books/123/chapters, then it means a new chapter. (You can also use HTTP PUT to chapters/5)

It is true that some scenarios, such as a money transfer operation from accountA to accountB maybe better represented in RPC style. First of all, these scenarios are rarely occur. Even when they occur, you can use a REST style to represent it ...

HTTP POST http://xyz.com/fund_transfer_operation

<operation>

<from>http://bank1.com/account/1</from>

<to>http://bank2.com/account/3</to>

<amount>400</amount>

</operation>

You also have an XML Schema attached to the payload. Now tell me what extra stuff that WSDL gives you that XML schema is missing.

Rgds, Ricky

Eclipse: New IDE Eases Open Source SOA

Jun 06, 2008 · Alex Johnson

Thanks for using concrete examples so we can do a technical comparison ...

This is one major difference between SOAP and REST.

1) SOAP Style has few URL endpoints, but each endpoint will have many operations. REST Style has many endpoints, each endpoint has few "standard" operations defined by HTTP Verbs.

In SOAP, GetFoo() and GetBar() need to use POST even though they are supposed to be idempotent, the response cannot be cached also

In REST, there will be two seperate URLs. http://xyz.com/object/foo and http://xyz.com/object/bar

In my book example, if you post "blah, blah, blah" ...

1) to http://xyz.com/books/123/table_of_content, then it means table of content of book123

2) to http://xyz.com/books/123/comments, then it means a new comment to book123

3) to http://xyz.com/books/123/chapters, then it means a new chapter. (You can also use HTTP PUT to chapters/5)

It is true that some scenarios, such as a money transfer operation from accountA to accountB maybe better represented in RPC style. First of all, these scenarios are rarely occur. Even when they occur, you can use a REST style to represent it ...

HTTP POST http://xyz.com/fund_transfer_operation

<operation>

<from>http://bank1.com/account/1</from>

<to>http://bank2.com/account/3</to>

<amount>400</amount>

</operation>

You also have an XML Schema attached to the payload. Now tell me what extra stuff that WSDL gives you that XML schema is missing.

Rgds, Ricky

Common REST Design Pattern

Jun 05, 2008 · Ricky Ho

Lets look at what WSDL has added on top of XML Schema ...

1) Operation

2) Input and Output Messages with parts

Since REST is using standard HTTP Verbs, it doesn't need the concept of "Operation", therefore no concept about parameters. REST just need to define the structure of the message payload, which XML Schema is already good enough.

There is an interesting article about the inter-operability status of SOAP. http://wanderingbarque.com/nonintersecting/2006/11/15/the-s-stands-for-simple/

Rgds,

Ricky

Eclipse: New IDE Eases Open Source SOA

Jun 05, 2008 · Alex Johnson

Lets look at what WSDL has added on top of XML Schema ...

1) Operation

2) Input and Output Messages with parts

Since REST is using standard HTTP Verbs, it doesn't need the concept of "Operation", therefore no concept about parameters. REST just need to define the structure of the message payload, which XML Schema is already good enough.

There is an interesting article about the inter-operability status of SOAP. http://wanderingbarque.com/nonintersecting/2006/11/15/the-s-stands-for-simple/

Rgds,

Ricky

Eclipse: New IDE Eases Open Source SOA

Jun 05, 2008 · Alex Johnson

If you count the number of projects, I don't disagree that there are more intranet SOA projects. But if you count the traffic volume, that I have a different view.

Vendors who sell technology, do not usually welcome simple solution approach, especially when they have already invested in legacy approach. There is no need to debate about SOAP's adoption, audiences should take a look at the reality and make your own judgement. One of the reason why there is not much vendor support on REST is because it is so easy to use and people won't pay. So look at Rails and other OpenSource REST framework

What I mean by the inter-operability problem, is not at such a basic level. If WSDL is not interoperable, then WS-* cannot even get to its current state. But try to get two different vendor implementation of WS-Security implementation, or WS-ReliableMessaging, WS-Policy ... etc.

On the application side, WSDL encourage tight couple with the application object code (such as autogen of WSDL ... etc.) make the interoperability very brittle. Code change at the back end in many case causing the change of WSDL and Schema. Usually manual fix is needed and error creeps in.

Common REST Design Pattern

Jun 05, 2008 · Ricky Ho

If you count the number of projects, I don't disagree that there are more intranet SOA projects. But if you count the traffic volume, that I have a different view.

Vendors who sell technology, do not usually welcome simple solution approach, especially when they have already invested in legacy approach. There is no need to debate about SOAP's adoption, audiences should take a look at the reality and make your own judgement. One of the reason why there is not much vendor support on REST is because it is so easy to use and people won't pay. So look at Rails and other OpenSource REST framework

What I mean by the inter-operability problem, is not at such a basic level. If WSDL is not interoperable, then WS-* cannot even get to its current state. But try to get two different vendor implementation of WS-Security implementation, or WS-ReliableMessaging, WS-Policy ... etc.

On the application side, WSDL encourage tight couple with the application object code (such as autogen of WSDL ... etc.) make the interoperability very brittle. Code change at the back end in many case causing the change of WSDL and Schema. Usually manual fix is needed and error creeps in.

Where are the Java Interfaces for Tivo Series III ?

May 28, 2008 · Jonathan Bruce

I agree that Map/Reduce is not for toy problem. The example I use to illustrate Map/Reduce is simple enough to help understanding the basic structure of Map/Reduce, but is also too simple that it can solved by many other simpler solution approaches.

Map/Reduce is a tool in the toolbox. Use it wherever you see fit.

Rgds, Ricky

JavaForce.com - Latest Java News Blog

May 28, 2008 · Javad min

Short-lived temporary objects will increase the load of GC. But in most case, people are not create temporary object to avoid "synchronized", they just create temp object in string manipulation or just create a function object, which can be eliminated easily.

Inappropriate "synchronized" is actually causing a bigger scalability problem than temp object. Use AtomicXXX if you can, and use the LOCK-Free data structure from Java concurrent package to avoid "synchronized" as much as possible.

Rgds, Ricky

JavaForce.com - Latest Java News Blog

May 28, 2008 · Javad min

Short-lived temporary objects will increase the load of GC. But in most case, people are not create temporary object to avoid "synchronized", they just create temp object in string manipulation or just create a function object, which can be eliminated easily.

Inappropriate "synchronized" is actually causing a bigger scalability problem than temp object. Use AtomicXXX if you can, and use the LOCK-Free data structure from Java concurrent package to avoid "synchronized" as much as possible.

Rgds, Ricky

JavaForce.com - Latest Java News Blog

May 28, 2008 · Javad min

Short-lived temporary objects will increase the load of GC. But in most case, people are not create temporary object to avoid "synchronized", they just create temp object in string manipulation or just create a function object, which can be eliminated easily.

Inappropriate "synchronized" is actually causing a bigger scalability problem than temp object. Use AtomicXXX if you can, and use the LOCK-Free data structure from Java concurrent package to avoid "synchronized" as much as possible.

Rgds, Ricky

Where are the Java Interfaces for Tivo Series III ?

May 28, 2008 · Jonathan Bruce

"Language" serve the purpose of representing a different level of abstraction. e.g. By using Java annotation, you are introducing a "language". SQL is another kind of higher level language.

Of course, we don't want to have many languages to represent the same abstraction. But having a different language to represent different abstraction is not a problem at all. In fact, it is even desirable because application designer can pick the level of abstraction they are comfortable with or care about.

Rgds,

Ricky

Where are the Java Interfaces for Tivo Series III ?

May 28, 2008 · Jonathan Bruce

"Language" serve the purpose of representing a different level of abstraction. e.g. By using Java annotation, you are introducing a "language". SQL is another kind of higher level language.

Of course, we don't want to have many languages to represent the same abstraction. But having a different language to represent different abstraction is not a problem at all. In fact, it is even desirable because application designer can pick the level of abstraction they are comfortable with or care about.

Rgds,

Ricky

Where are the Java Interfaces for Tivo Series III ?

May 28, 2008 · Jonathan Bruce

"Language" serve the purpose of representing a different level of abstraction. e.g. By using Java annotation, you are introducing a "language". SQL is another kind of higher level language.

Of course, we don't want to have many languages to represent the same abstraction. But having a different language to represent different abstraction is not a problem at all. In fact, it is even desirable because application designer can pick the level of abstraction they are comfortable with or care about.

Rgds,

Ricky

What's New in Python 2.5

May 21, 2008 · admin

I agree that the Map/Reduce model, from an abstract model perspective is not restricted to batch oriented task. But if you look at the current popular implementation, there are quite some overhead in network I/O, File I/O when splitting data, storing intermediate result ... etc. While such overhead is insignificant for long-running batch tasks, they are typically not acceptable for online processing where response time is critical.

Looking at its history, Map/Reduce model evolves from the solution Google use to do web crawling, indexing ... which are basically batch, offline task.

A common problem I frequently run into ... a seqeuntial algorithm is redesigned for parallel execution and typically break down into multiple phases of Map/Reduce. But a Reduce Task cannot be the Map task at the same method. The reduce task has to store its result into the distributed file system, which will go through the schedule and then picked up by another Map task. These extra I/O overhead wipe off most of the gain of parallelizing the algorithm.

Anyone has encountered similar issue ? I would love to hear how other people deal with this.

Rgds.

User has been successfully modified

Failed to modify user

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: