Logistics is core to an economy's growth. It is a huge industry globally and roughly 80X the size of industries like ride hailing/cab aggregation. In India too, it contributes to about 14% of the GDP. Yet it is archaic and in dire need of an overhaul. This overhaul has to happen through a technological renaissance much like the dark ages. At RIVIGO, we are leading this wave through next generation technology and data solutions.
The way forward is to implement data-driven decision making at all levels. Increasingly more and more decisions are driven by data at RIVIGO. It is at a point where, for instance, we can storyboard the activities of our field operations team to derive useful insights and weed out inefficiencies.
In this series of posts, we will cover how such decision making is driven in Zoom, our express cargo business. In this business, we deal with shipments of size that only require part truck-loads, thereby providing tenancy to multiple clients in order to optimize vehicle utilization. The topic of this series will specifically cover the technical aspects of the data platform architecture that has been deployed to capture the heaps of data generated by field activities. This platform can easily scale up and support even at 100X level of our current field operations, both in terms of data generation and data warehousing. The data captured is then used to precisely predict every life event of a consignment through intelligent algorithms, analytics and machine learning. This is akin to creating the life horoscope of consignments using past life 'karma'.
One singular focus in Zoom is to achieve 'perfect delivery'. Breaking this down, it entails delivering consignments to clients within the promised turnaround time with zero damages, and at minimal costs. Our objective is to enable this through intelligent data-driven prediction and decision making.
Keeping the business objective in mind, we have built a series of data-aware algorithms to achieve the following primary goals:
- Intelligently plan all consignments in a trip to honour the delivery time promised to customers.
- Learn from mistakes and devise solutions to avoid repetition
- Celebrate the good work done by our facility heroes to further engrain on-time mindset
- Intelligently load the truck in a way that ensures optimal utilization
- Identify the boxes which have a higher tendency to get damaged
- Automate alerts to operations on important events in consignment lifecycle
- Learn from historical data and trips schedule in a way that ensures maximum vehicle capacity utilisation without breaching delivery time promised to customers
- Realtime consignment assignment to a trip to get maximum trip utilisation within delivery turnaround time
Technical aspects of the data platform
In the current era of data, where storage and processing power is becoming increasingly cheaper, there is a lot that can be desired while working on a data pipeline. The same was true when we approached the challenge of building the first version of our data platform to enable intelligent engineering at scale.
In this and subsequent posts, we will talk about each of the objectives outlined below in detail. We will also throw light on how we managed to overcome some challenges associated with the same.
- Data injection – Change data capture
- Data warehousing – All data at one place
- Data Enrichment to refine the data into meaningful events and business activities
- Batch and real time data analysis
- Multidimensional data analysis
- Business dashboards and scheduled reports for business owners
- Data sanity and security
- ML/AI at RIVIGO
While designing our data platform, we considered the following design principles:
- Hot, warm and cold data storage
- Separate storage and compute to scale independently
- Support for Lambda architecture
- Enable data science support
In our architecture, we ingest our transactional data in MySQL, MongoDB and Neo4j using spring boot based microservices. Our microservices interact on the data stream backbone using Kafka. The transactional data from bin-logs of MySQL and op-logs of MongoDB are streamed and brought over to Kafka. For this, we serialize the data captured in the bin-logs to changelog json and stream it over to Kafka using the Kafka connect cluster and Debezium.
Once we have the data in Kafka, we generate real-time stream processing over Kafka Stream Clients (KSC) to enrich the data for real-time use cases. We store the captured changelog in S3, relayed from Kafka using Kafka Connect. S3 is used as the init point to initiate our data lake. This data is then stored in Apache Hive tables after some de-duplication and cleaning.
In Hive, we maintain two tables for each data source point (one for point in time snapshot and the other for insert only audit trail). We run batch jobs to provide analytics on top of this data. This is consumed to provide MIS (management information service) reports and create several dashboards. To enable data analysts to write ad-hoc queries and gather new insights when business flows evolve or new features are added, we use Apache Presto.
To run the Spark and Hive jobs for batch processing, we use the Qubole Resource Manager. In addition to scaling up, it also enables scale down of the compute workers when not required (in contrast to YARN resource manager) to optimize on cost aspects of the platform.
The ETL pipelines that run as batch jobs are orchestrated using Apache Airflow to manage the DAGs (directed acyclic graphs) created for solving business use case pipeline of map-reduce jobs. This platform enables our engineers to focus on writing complex dependent algorithm steps on Apache Airflow and produce output, which is consumed by our dashboards and business reports.
As for centralized logging for these moving infrastructure parts, we use the Elastic stack, components being elasticsearch and kibana.
Data injection: Change data capture
The first challenge for data infrastructure is to capture all the changelogs of events happening on the field. As briefly mentioned in the architecture section before, for the transactional state changes, MySQL is used for capturing the intent and to perform changelog capturing we use Debezium over Kafka connect cluster. To know more about the virtues of changelog capturing, please refer to this excellent article by Martin Kleppmann.
Using this stack, we started to capture the binlog for MySQL and oplog for MongoDB. An additional benefit of this stack is that it works in a reactive way. This gives us a unique functionality of capturing the changelogs as events, i.e., capturing of actions that have happened and can no longer be mutated. This enables convenient and precise audit log generation. Even if a corrective action needs to be taken, one could make a new insert changelog entry rather than have an irrecoverable update that may corrupt the audit trails.
This is awesome at multiple levels:
- It enables the creation of an entire platform working on changelogs, without proactively changing application workflows for capturing the changelogs, by directly streaming the state changes from database binlogs.
- It pushes the data on Kafka queues, which enables us to provide real-time insights using state of the art streaming technologies like Kafka streams, Apache spark streaming or Apache flink.
- As the data reaches Kafka, it can be consumed by multiple systems (like warehouse and key value cache store). If downstream consumers get out of sync for any reason, they can be brought back in sync by replaying the messages persisted on Kafka.
- It synergizes really well with us also being a java shop.
However, as with all good things, this too comes with some caveats. When a transactional scope is used on a RDBMS, we get ACID guarantees. However, we don't have any such guarantees once we map each table to a Kafka topic and start streaming the data. An unintended side-effect is that the systems eventually become consistent on the other side of the binlogs. With this comes a new set of challenges, as certain race conditions may occur. I will explain the same with an example.
Consider a workflow where we received a service request to book a consignment. On the field, a pilot (truck driver) reaches the client, procures the boxes and collects the money. Once the boxes are procured, the state of the consignment must be set to 'procured' and that of payment to 'complete'. Let's suppose there were 10 such updates (10 consignments booked) that created data entries in transactional scope and the changelogs were streamed to Kafka topics. Now if someone in tech support or field operations makes an update on the same consignments, it would cause updates to the same rows. In simple terms, the situation is as below:
- Event A caused Table A and Table B to update rows (id = 1; table = A, id = 1; table = B);
- Event B caused Table B and Table C to update rows (id = 1; table = B, id = 1; table = C);
This implies we have one event in Table A, two events in Table B (with the same id due to update on same consignment) and one event in Table C. Since the data is eventually consistent, we need to identify which of these two events in Table B has to be joined to Table A and subsequently to Table B.
In other words, we need to uniquely identify the transactions that sandbox the updates. MySQL provides a feature called GTID that can enable this. In case of doing real-time stream joins, GTID can be used to find order in chaos by typesetting the transactional boundaries with a unique identifier to run streaming joins accurately.
This is how we have overcome the challenge of capturing the changelogs accurately at RIVIGO. This is the first step in creating the life horoscope of a consignment, comparable to capturing past life events in a journal to then predict the future life journey. In the next post, we will cover the subsequent topics of data warehousing and data enrichment.