As mentioned before, our final goal is to build a Saas big data analysis application, which will deployed in AWS servers. In order to fulfill this goal, we need to build distributed crawling, indexing and distributed training systems.
The focus of this article is how to build the distributed crawling system. The fancy name for this system will be Black Widow.
As usual, let start with the business requirement for the system. Our goal is to build a scalable crawling system that can be deployed on the cloud. The system should be able to function in an unreliable, high-latency network and can recover automatically from a partial hardware or network failure.
For the first release, the system can crawl from 3 kind of sources, Datasift, Twitter API and Rss feeds. The data crawled back are called Comment. The Rss crawlers suppose to read public sources like website or blog. It is free of charge. DataSift and Twitter both provide proprietary APIs to access their streaming service. Datasift charges its users by comment count and the complexity of CSLD (Curated Stream Definition Language, their own query language). Twitter, in the other hand, offers free Twitter Sampler streaming.
In order to do cost control, we need to implement mechanism to limit the amount of comments crawled from commercial source like Datasift. As Datasift provided Twitter comment, it is possible to have single comment coming from different sources. At the moment, we did not try to eliminate and accept it as data duplication. However, this problem can be eliminated manually by user configuration (avoid choosing both Twitter and Datasift Twitter together).
For future extension, the system should be able to link up related comments to from a conversation.
Food for Thought
Our first thought when getting requirement is to build the crawling on the nodes, which we called Spawn and let the hub, which we called Black Widow to manage the collaboration of effort among nodes. This idea was quickly accepted by team members as it allows the system to scale well with the hub doing limited work.
As any other centralized system, Black Widow suffers from single point of failure problem. To help easing this problem, we allow the node to function independently for a short period after losing connection to Black Widow. This will give the support team a breathing room to bring up backup server.
Another bottle neck in the system is data storage. For the volume of data being crawled (easily reach few thousands records per seconds), NoSQL is clearly the choice for storing the crawled comments. We have experiences working with Lucene and MongoDB. However, after research and some minor experiments, we choose Cassandra as the NoSQL database.
With that few thoughts, we visualize the distributed crawling system to be build following this prototype:
In the diagram above, Black Widow, or the hub is the only server that has access to the SQL database system. This is where we store the configuration for crawling. Therefore, all the Spawns, or crawling nodes are fully stateless. It simply wakes up, registers itself to Black Widow and does the assigned jobs. After getting the comments, the Spawn stores it to Cassandra cluster and also push it to some queues for further processing.
Brainstorming of possible issues
To explain the design to non-technical people, we like to relate the business requirement to a similar problem in real life so that it can be easier to understand. The similar problem we choose would be collaborating of efforts among volunteers.
Imagine if we need to do a lot of preparation work for the upcoming Olympic and decide to recruit volunteers all around the world to help. We do not know volunteers but the volunteers know our email, so they can contact us to register. Only then, we know their emails and may send tasks to them through email. We would not want to send one task to two volunteers or left some tasks unattended. We want to distribute the tasks evenly so that no volunteers are suffering too much.
Due to cost issue, we would not contact them through mobile phone. However, because email is less reliable, when sending out tasks to volunteers, we would request a confirmation. The task is consider assigned only when the volunteer replied with confirmation.
With above example, the volunteers represent Spawn nodes while email communication represent unreliable and high latency network. Here are some problems that we need to solve:
1/ Node failure
For this problems, the best way is to check regularly. If a volunteer stop responding to the regular progress check email, the task should be re-assign to someone else.
2/ Optimization of tasks assigning
Some tasks are related. Therefore assigning related tasks to the same person can help to reduce total effort. This happen with our crawling as well because some crawling configurations have similar search terms, grouping them together to share the streaming channel will help to reduce final bill.
Another concern is the fairness or ability to distribute the amount of works evenly among volunteers. The simplest strategy we can think of is Round Robin but with a minor tweak by remembering earlier assignments. Therefore, if a task is pretty similar to the tasks we assigned before, the task can be skipped from Round Robin selection and directly assign to the same volunteer.
3/ The hub is not working
If due to some reasons, our email server is down and we cannot contact volunteer any more, it is better to let the volunteers stop working on the assigning tasks. The main concern here is over-running of cost or wasted efforts. However, stopping working immediately is too hasty as temporary infrastructure issue may cause the communication problem.
Hence, we need to find a reasonable amount of time for the node to continue functioning after being detached from the hub.
4/ Cost control
Due to business requirement, there are two kinds of cost control that we need to implement. First is the total of comments being crawled per crawler and second is the total of comments crawled by all crawlers belong to the same user.
This is where we have a debate about the best approach to implement cost control. It is very straight forward to implement the limit for each crawler. We can simply pass this limit to the Spawn node and it will automatically stop the crawler when the limit is reached.
However, for the limit per user, it is not so straight forward and we have two possible approaches. For the simpler choice, we can send all the crawlers of one user to the same node. Then, similar to the earlier problem, the Spawn node knows the amount of comments collected and stops all crawlers when limit reached. This approach is simple but it limits the ability to distribute jobs evenly among nodes. The alternative approach is to let all the nodes retrieve and update a global counter. This approach creates huge network traffic internally and add considerable delay to comment processing time.
At this point, we temporarily choose the global counter approach. This can be considered again if the performance become a huge concern.
5/ Deploy on the cloud
As any other Cloud application, we can not put too much trust in the network or infrastructure. Here is how we make our application conform to the check-list mentioned in last article:
- Stateless: Our spawn node is stateless but the hub is not. Therefore, in our design, the nodes do actual work and the hub only collaborates efforts.
- Idempotence: We implement hashCode and equal methods for every crawler configuration. We store the crawler configurations in the Map or Set. Therefore, the crawler configuration can be sent multiple times without any other side effect. Moreover, our node selection approach ensure that the job will be sent to the same node.
- Data Access Object: We apply the JsonIgnore filter on every model objects to make sure no confidential data flying around in the network.
- Play Safe: We implement health-check API for each node and the hub itself. The first level of support will get notified immediately when anything wrong happened.
- Hub failure: Node register itself to the hub when it start up. From then, it is the one way communication when only the hub send jobs to node and also poll for status update. The node is consider detached if it failed to get any contact from Hub for a pre-defined period. If a node is detached, it will clear all the job configurations and start registering itself to the hub again. If the incident is caused by hub failure, a new hub will fetch crawling configurations from database and start distributing jobs again. All the existing jobs on Spawn nodes will be cleared when the Spawn node go to detached mode.
- Node failure: When hub fail to poll a node, it will do a hard reset by removing all working jobs and re-distribute from beginning again to the working nodes. This re-distribution process help to ensure optimized distribution.
- Job failure: There are two kind of failures happened when the hub do sending and polling jobs. If a job is failed in the polling process but the Spawn node is still working well, Black Widow can re-assign the job to the same node again. The same thing can be done if the job sending failed.
Data Source and Subscriber
In the initial thought, each crawler can open it own channel to retrieve data but this does not make sense any more when inspecting further. For Rss, we can scan all URLs once and find out the keywords that may belong to multiple crawlers. For Twitter, it supports up to 200 search terms for one single query. Therefore, it is possible for us to open single channel that serve multiple crawlers. For Datasift, it is quite rare, but due to human mistake or luck, it is possible to have crawlers with identical search terms.
This situation lead us to split out crawler to two entities: subscriber and data source. Subscriber is in charge of consuming the comments while data source is in charge of crawling the comments. With this design, if there are two crawlers with similar keywords, a single data source will be created to serve two subscribers, each processing the comments their own ways.
Data source will be created when and only when no similar data source exist. It starts working when having the first subscriber subscribe to it and retire when the last subscriber unsubscribe from it. With the help of Black Widow to send similar subscribers to the same node, we can minimize the amount of data sources created and indirectly, minimize the crawling cost.
The biggest concern of data structure is Thread Safe issue. In the Spawn node, we must store all running subscribers and data sources in memory. There are a few scenarios that we need to modify or access these data:
- When a subscriber hit the limit, it automatically unsubscribe from data source, which may lead to deactivation of data source.
- When Black Widow send a new subscriber to Spawn nodes.
- When Black Widow send a request to unsubscribe an existing subscriber.
- Health check API expose all running subscribers and data sources.
- Black Widow regularly polls the status of each assigned subscriber.
- The Spawn node regularly checks and disables orphan subscribers (subscriber which is not polled by Black Widow).
- Implement hashCode and equals method for every subscriber and data source.
- We choose the Set or Map to store collection of subscribers and data sources. For records with identical hash code, Map will replace the record when there is new insertion but Set will skip the new record. Therefore, if we use Set, we need to ensure new records can replace old record.
- We use synchronized in data access code.
- If Spawn node receive a new subscriber that similar to existing subscriber, it will compare and prefer to update existing subscriber instead of replacing. This avoid the process of unsubscribing and subscribing identical subscribers, which may interrupt data source streaming.
- Distribute the jobs evenly among Spawn nodes.
- Route similar jobs to the same nodes.
The next diagram is the subscriber life-cycle
Similar to above, Black Widow has the responsibility of polling the subscribers it send to Spawn node. If a subscriber is not being polled by Black Widow anymore, Spawn node will treat the subscriber as orphan and remove it. This practice help to eliminate the threat of Spawn node running obsoleted subscriber.
On Black Widow, when a subscriber polling fails, it will try to get a new node to assign the job. If the Spawn node of the subscriber still available, it is likely that the same job will go to the same node again due to our routing mechanism we used.
In a happy scenario, all the subscribers are running, Black Widow is polling and nothing else happen. However, this is not likely to happen in real life. There will be changes in Black Widow and Spawn nodes from time to time, triggered by various events.
For Black Widow, there will be changes under following circumstances:
- Subscriber hit limit
- Found new subscriber
- Existing subscriber disabled by user
- Polling of subscriber fails
- Polling of Spawn node fails