Big Data: Apache Kafka, Schema Registry And Avro Records
The most common ways to store data are CSV, XML and JSON. JSON is less verbose than XML, but both still use a lot of space compared to binary formats. In JSON, you repeat every field name with every single record. In contrast, Avro uses a schema to specify the structure of the data being encoded. For a small dataset, the gains are negligible, but once you get into the terabytes, the choice of data format can have a big impact.
When looking at raw Avro data, there is nothing to identify fields or their datatypes. The encoding simply consists of values concatenated together. For instance, a string is just a length prefix followed by UTF-8 bytes. To parse the binary data, you go through the fields in the order that they appear in the schema and use the schema to tell you the datatype of each field. Therefore, Avro requires the entire writer’s schema to be present when reading the record. We can’t just include the entire schema with every record because the schema would likely be much bigger than the encoded data, making all the space savings from the binary encoding futile. In the case of large file with lots of records, we include the writer’s schema once at the beginning of the file. When two processes are communicating over the network, they can negotiate the schema version while setting up the connection and then use that schema for the lifetime of the connection. With Kafka, we typically follow a common architecture pattern and use a schema registry. The schema registry is not part of Apache Kafka but here are several open source options to choose from. The idea is to store the schemas used to write data to Kafka in the registry. Then we simply store the identifier for the schema in the record we produce to Kafka. The consumers can use the identifier to pull the record out of the schema registry and deserialize the data.
One of the most interesting features of Avro, and what makes it a good fit for a message bus like Kafka, is that when the application writing messages switches to a new schema, the application that is reading the data can continue processing messages without requiring any change or update.
For example, suppose we had the following schema.
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "faxNumber", "type": ["null", "string"], "default": "null"}
]
}
We used this schema for a few months and generated a few terabytes of data in this format. Now, say that we decided that in the new version, we will replace the fax number field with the email field.
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": "null"}
]
}
After upgrading to the new version, old records will contain “faxNumber” and new records will contain “email”. The reading applications will contain calls to methods similar to getName(), getId(), and getFaxNumber(). If it encounters a message written with the new schema, getName() and getId() will continue working the same way as before, but getFaxNumber() will return null because the message will not contain a fax number. In the event we upgrade our reading application such that it no longer has the getFaxNumber() method but rather getEmail(), if it encounters a message written with the old schema, getEmail() will simply return null because the older messages do not contain an email address.
Code
There are several ways of working with Avro. However, in this article we’ll be using Python. The following commands download the tarball, decompress it and install the Avro library.
wget http://apache.mirror.globo.tech/avro/avro-1.8.2/py3/avro-python3-1.8.2.tar.gz
tar -xvf avro-1.8.2.tar.gz
cd avro-1.8.2-python3-1.8.2
sudo python setup.py install
Next, we’ll import the required libraries. If your installation was successful, you shouldn’t see any errors.
import avro
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
Avro schemas are typically stored in .avsc files. Every schema will have a namespace, type, name and fields. Each field has a name property and a type property. The name property indicates the key and the type property is used to specify the type of content that can be associated with the key. Notice how you must explicitly add null, if you want to make the field optional.
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
As we saw previously, whenever we want to write data, we require the schema.
schema = avro.schema.Parse(open('user.avsc', "r").read())
Next, we create a user.avro file with two users.
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()
Then, to read the contents of the file, we run the following lines.
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
print(user)
reader.close()
If you’re working with data that has been compressed using Snappy, you’ll need to install packages at the operating system level as well as Python.
brew install snappy
pip install python-snappy