What is MongoDB?

  • MongoDB is a document database with the scalability and flexibility that you want with the querying and indexing that you need.

  • It stores data in flexible, JSON-like documents, meaning fields can vary from document to document and data structure can be changed over time.

  • The document model maps to the objects in your application code, making data easy to work with

  • Ad hoc queries, indexing, and real time aggregation provide powerful ways to access and analyze your data

  • MongoDB is a distributed database at its core, so high availability, horizontal scaling, and geographic distribution are built in and easy to use

Advantages of MongoDB

Different from SQL databases which store data in tables, MongoDB stores data in collections. Collections are JSON-like documents that can have varied structures. This makes it easier and faster to store and retrieve data. MongoDB stands out as a full cloud-based developer data platform that extends beyond a traditional database. With offerings like MongoDB Atlas, developers gain access to integrated services such as the Performance Advisor for optimization, Atlas Search for full-text querying, MongoDB Charts for data visualization, and seamless multi-cloud deployment. This flexibility means that MongoDB accommodates virtually any data structure, and its document model, based on the BSON format, simplifies data manipulation. The data platform is designed with widely supported and code-native data access, eliminating the need for ORMs and allowing direct manipulation of data in formats familiar to developers.

The platform is built for agility, with a change-friendly design that permits schema modifications without downtime, enabling continuous delivery and integration. MongoDB’s powerful querying and analytics capabilities facilitate complex data retrieval and manipulation without the need for traditional joins or transactions, while also providing the means for advanced analytics. Horizontal scalability is effortlessly achieved through sharding, maintaining high performance even as data volume grows. MongoDB Atlas simplifies the installation process with a user-friendly interface and a wide range of deployment options, including serverless databases that ensure cost-effectiveness by charging only for actual usage. Moreover, the platform is supported by comprehensive technical documentation, a community forum, and MongoDB University, which offers free educational courses, alongside various support plans for enterprise and paid tier users.

For more information, visit the MongoDB website.

Reading data from MongoDB

A simple python code implementation of connecting to MongoDB with the pymongo library and reading data from a collection.

import pymongo

def convert_to_string(data_list: list) -> list:
    """Converts data to string for MongoDB insertion. May or may not be needed depending on the data being inserted"""
    try:
        for data in data_list:
            for col in data:
                if isinstance(data[col], (dict, list)):
                    data[col] = json.dumps(data[col])

    except Exception as e:
        logger.debug(f"An error occurred while converting data: {e}")

    return data_list

class Mongo:
    def __init__(self):
        self.creds = secrets.get("path/to/creds")
        self.client = pymongo.MongoClient(self.creds)
        self.db = self.client["database"]
        self.collection = self.client["database"]["collections"]

    def get_all_documents(self) -> list:
        documents = self.collection.find({})
        all_documents = []
        for document in documents:
            all_documents.append(document)
        return all_documents

    @staticmethod
    def return_data(documents: list) -> list:
        for doc in documents:
            doc["_id"] = str(doc["_id"])

        documents = convert_to_string(documents)

        return [MongoData(**data) for data in documents]

def main():
    mongo = Mongo()
    documents = mongo.get_all_documents()
    data = mongo.return_data(documents)

if __name__ == "__main__":
    main()

Note that the line documents = collection.find({}) will return all documents in the collection. If you want to filter the documents, you can pass a query to the find() method.

For example, documents = collection.find({"name": "John Doe"}) will return all documents where the name field is “John Doe”.

To filter data based on a range, you can use the $gt, $lt, $gte, and $lte operators. For example, documents = collection.find({"age": {"$gt": 18}}) will return all documents where the age field is greater than 18. I once worked with a collection that for some reason contained dates in the year of 0000, and mongo failed to parse them. To get around this,

from datetime import datetime
documents = collection.find({"DateField": {"$gte": datetime.min, "$lte": datetime.max}})

Writing data from Kafka topic to MongoDB

Connecting to MongoDB with the pymongo library.

from pymongo import MongoClient

def create_client():
    return MongoClient(MONGODB_CLIENT)


def get_collection():
    client = create_client()
    db = client[MONGODB_NAME]
    return db[MONGODB_COLLECTION]
    

Setting up a Kafka consumer to read data from a Kafka topic and write it to MongoDB.

def commit_completed(err, partitions):
    if err:
        logger.error("Commit failed: %s", err)


def create_consumer():
    conf = {
        "group.id": KAFKA_GROUP_ID,
        "bootstrap.servers": KAFKA_SERVER,
        "security.protocol": "SASL_SSL",
        "sasl.mechanism": "SCRAM-SHA-256",
        "sasl.username": KAFKA_USERNAME,
        "sasl.password": KAFKA_PASSWORD,
        "on_commit": commit_completed,
    }
    con = Consumer(conf)
    con.subscribe([KAFKA_TOPIC])
    return con

Putting it all together.

def main():
    logger.info("Starting consume from kafka to Mongo")
    consumer = create_consumer()
    mongo_collection = get_collection()
    while True:
        try:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                logging.error(msg.error())
            else:
                json_data = json.loads(msg.value().decode("utf-8"))
                mongo_collection.insert_one(json_data)
        except json.JSONDecodeError as e:
            logging.error(f"JSON decode error for message {message}: {e}")
            continue
        except DuplicateKeyError:
            formatted_json = json.dumps(
                json_data, indent=4, sort_keys=True, ensure_ascii=False
            )
            logging.warning(f"Duplicate entry: {formatted_json}")


if __name__ == "__main__":
    main()

Final thoughts

MongoDB is a powerful and flexible database that can be used for a wide range of applications easily. At the end of the day, whether or not you choose to use MongoDB will depend on your specific use case and requirements. If the use case is more analytics related, then Snowflake, Databricks or BigQuery might be a better choice. Since Mongo’s support of analytics tools like Power BI or dbt is currently still limited. If the use case is to store data of small apps, then MongoDB might be a better choice.

Thank you for reading and have a nice day!

If you want to support my work,

Buy me a coffee

Honestly, if you made it this far you already made my day :)