An Open Data Stack Part 3

Part 3: The Bronze Layer

Previous Parts

In this part of the series, I’m going to focus on setting just the bronze layer of my data infrastructure. I drink from the kool-aid of the medallion architecture so I’ll keep this post in line with that sort of an idea.

The Bronze layer is the ‘rawest’ form of our data pipeline. It is typically an exact representation of your production data just dumped into a storage account or some commodity layer for fast, cheap, and efficient storage. This layer is optimized for reading and writing single files and is not really an exciting part of your data stack.

So for this phase of the tutorial, we’re going to use the Dagster instance we’ve set up in the previous parts and start stitching some concepts together. We’re going to use Dagster to connect to an API and write data directly to our Minio storage bucket.

Organizing our S3 Containers

We’re going to need to do a few things to our minio setup to enable connectivity from Dagster. We could do this via the UI but given the recent changes to Minio by it’s maintainers, we can’t even use the AI to set this up anymore. So their paywall actually encourages good design of our open source data stack :)

First, we need to create environment variables for our docker-compose for a MINIO_ACCESS_KEY and MINIO_SECRET_KEY. These can be randomly generated strings.

...
# Minio Setup
MINIO_ACCESS_KEY="ajsdlkfajslkasjdoifau"
MINIO_SECRET_KEY="asdjkhfasdkfhasduihakjh"
MINIO_USER=minio
MINIO_PASS=minio123
MINIO_ENDPOINT_URL="http://minio:9000"
...

Then we use the Minio MC (minio client) to set up the access keys and create the bronze bucket. Add a new service to your docker-compose like below:

services:
  ...
  minio-setup:
    image: minio/mc
    depends_on:
      minio:
        condition: service_healthy
    entrypoint: >
      /bin/sh -c "
      echo 'Waiting for MinIO to be ready...';
      sleep 5;
      /usr/bin/mc alias set myminio http://minio:9000 ${MINIO_USER} ${MINIO_PASS};
      /usr/bin/mc mb myminio/bronze --ignore-existing;
      /usr/bin/mc admin user add myminio ${MINIO_ACCESS_KEY} ${MINIO_SECRET_KEY};
      /usr/bin/mc admin policy attach myminio readwrite --user ${MINIO_ACCESS_KEY};
      exit 0;
      "
    networks:
      - docker_example_network
    ...

Now when you restart your stack with docker compose up --build your going to have a access key and secret key set up in Minio and you’ll see a new bronze bucket if you access the minio UI at localhost:9000

Dagster Code

I’m going to create a brand new project called nasaproject because… I’m going to connect to Nasa’s api and play with some of their data. After much deliberation, my interest in NEO (near earth objects) and my interest in surfacing how close we are to obliteration at any given minute has won my curiousity. I think this would be a really cool thing to dashboard so that’s where we’re going to start!

First, if you don’t have an API key from NASA, go get yourself one from here: https://api.nasa.gov. You could also use the DEMO_KEY but you’re going to get throttled very quickly.

Next, you’re going to want to add that api key to your .env file.

# NASA Setup
NASA_API_KEY="DEMO_KEY"

Next you’re going to want to make sure that you have a python environment ready to go.

uv venv venv
source ./.venv/source/activate

Now, we code! First we’ll scaffold the project

mkdir nasaproject
cd nasaproject
uv init
uv add dagster
mkdir src
touch src/definitions,py

We’ll start with a barebones definitions,py

from dagster import Definitions

defs = Definitions(
    assets=[],
    resources={},
)

Next, we start creating our resources.

For this part of the tutorial, we’ll start organizing our code a bit better.

mkdir defs
touch defs/assets.py
touch defs/resources.py

Assets is where we define our Dagster assets and Resources is where we’re going to define our APIs and services for interacting with our stack. To begin, let’s create an s3 compatible storage broker for Minio!

import boto3
from dagster import ConfigurableResource, EnvVar
import json
from pydantic import Field
from typing import Any

class MinIOResource(ConfigurableResource):
    """Resource for interacting with MinIO (S3-compatible object storage)."""
    
    endpoint_url: str = Field(
        description="MinIO endpoint URL"
    )
    access_key: str = Field(
        description="MinIO access key"
    )
    secret_key: str = Field(
        description="MinIO secret key"
    )
    region_name: str = Field(
        default="us-east-1",
        description="AWS region name (required even for MinIO)"
    )
    
    def get_client(self):
        """Get an S3 client configured for MinIO."""
        return boto3.client(
            's3',
            endpoint_url=self.endpoint_url,
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key,
            region_name=self.region_name,
        )
    
    def get_resource(self):
        """Get an S3 resource configured for MinIO."""
        return boto3.resource(
            's3',
            endpoint_url=self.endpoint_url,
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key,
            region_name=self.region_name,
        )
    
    def write_json(self, bucket: str, key: str, data: Any) -> None:
        """Write JSON data to MinIO."""
        s3_client = self.get_client()
        json_data = json.dumps(data, indent=2)
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json_data.encode('utf-8'),
            ContentType='application/json'
        )


# Instantiate the resource with environment variables
minio_resource = MinIOResource(
    endpoint_url=EnvVar("MINIO_ENDPOINT_URL"),
    access_key=EnvVar("MINIO_ACCESS_KEY"),
    secret_key=EnvVar("MINIO_SECRET_KEY"),
)

You’ll notice we have to add another package here in boto3 - which is a library that supports interactions with S3.

uv add boto3

Then back in your definitions.py you’re going to add the resources to the resources part of your defs object.

from dagster import Definitions
from defs.resources import minio_resource

defs = Definitions(
    assets=[],
    resources={"minio": minio_resource},
)

Next we need to wire up our API. We’re going to do this by declaring a generic API handler, then extend it for the NASA API. Ready?

...
class APIClientResource(ConfigurableResource):

    base_url: str = Field(
        description="Base URL for the API"
    )
    api_key: Optional[str] = Field(
        default=None,
        description="API key for authentication (if using API key auth)"
    )
    username: Optional[str] = Field(
        default=None,
        description="Username for authentication (if using username/password)"
    )
    password: Optional[str] = Field(
        default=None,
        description="Password for authentication"
    )
    timeout: int = Field(
        default=30,
        description="Request timeout in seconds"
    )
    
    def __init__(self, **data):
        super().__init__(**data)
        self._session = None
        self._token = None
        self._token_expires_at = None
    
    def _get_session(self) -> requests.Session:
        """Get or create a requests session."""
        if self._session is None:
            self._session = requests.Session()
            # Set default headers
            self._session.headers.update({
                'User-Agent': 'Dagster-API-Client/1.0',
                'Content-Type': 'application/json'
            })
        return self._session
    
    def _authenticate(self) -> str:
        """
        Perform authentication and return access token.
        Override this method for specific API authentication flows.
        """
        if self.api_key:
            # Simple API key auth
            return self.api_key
        
        if self.username and self.password:
            # Username/password auth - example for OAuth2 or similar
            session = self._get_session()
            response = session.post(
                f"{self.base_url}/auth/token",
                json={
                    "username": self.username,
                    "password": self.password
                },
                timeout=self.timeout
            )
            response.raise_for_status()
            data = response.json()
            
            # Store token and expiration
            self._token = data.get('access_token')
            expires_in = data.get('expires_in', 3600)  # Default 1 hour
            self._token_expires_at = datetime.now() + timedelta(seconds=expires_in)
            
            return self._token
        
        return None
    
    def _ensure_authenticated(self):
        """Ensure we have a valid token."""
        if self._token is None or (self._token_expires_at and datetime.now() >= self._token_expires_at):
            token = self._authenticate()
            if token:
                session = self._get_session()
                if self.api_key:
                    # API key in header
                    session.headers.update({'X-API-Key': token})
                else:
                    # Bearer token
                    session.headers.update({'Authorization': f'Bearer {token}'})
    
    def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        Make a GET request to the API.
        
        Args:
            endpoint: API endpoint (will be appended to base_url)
            params: Query parameters
            
        Returns:
            JSON response as dictionary
        """
        self._ensure_authenticated()
        session = self._get_session()
        
        url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
        response = session.get(url, params=params, timeout=self.timeout)
        response.raise_for_status()
        
        return response.json()
    
    def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        Make a POST request to the API.
        
        Args:
            endpoint: API endpoint
            data: Request body data
            
        Returns:
            JSON response as dictionary
        """
        self._ensure_authenticated()
        session = self._get_session()
        
        url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
        response = session.post(url, json=data, timeout=self.timeout)
        response.raise_for_status()
        
        return response.json()
    
    def close(self):
        """Close the session."""
        if self._session:
            self._session.close()
            self._session = None

class NASANeoWsAPIResource(APIClientResource):
    """
    NASA NeoWs (Near Earth Object Web Service) API client.
    Provides access to near earth asteroid information.
    
    Get your API key from: https://api.nasa.gov/
    """
    
    def __init__(self, **data):
        # Set default base_url for NASA API
        if 'base_url' not in data:
            data['base_url'] = "https://api.nasa.gov/neo/rest/v1"
        super().__init__(**data)
    
    
    def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        Make a GET request to NASA API with api_key in query params.
        """
        session = self._get_session()
        
        # Add api_key to params
        if params is None:
            params = {}
        params['api_key'] = self.api_key or 'DEMO_KEY'
        
        url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
        response = session.get(url, params=params, timeout=self.timeout)
        response.raise_for_status()
        
        return response.json()
    
    def get_feed(self, start_date: str, end_date: str) -> Dict[str, Any]:
        params = {
            'start_date': start_date,
            'end_date': end_date
        }
        return self.get('feed', params=params)

# Instantiate NASA NeoWs resource
nasa_neows_api = NASANeoWsAPIResource(
    api_key=EnvVar("NASA_API_KEY")  # Falls back to DEMO_KEY if not set
)

Now we can import the resources in our definitions.py file

from dagster import Definitions
from defs.resources import minio_resource, nasa_neows_api

defs = Definitions(
    assets=[],
    resources={"minio": minio_resource, "nasa_neows_api": nasa_neows_api},
)

Next, we’ll create our first asset! We’re going to load the NEO’s every day. For brevity, we’re going ot just shove everything into an assets.py file, but there’s something to be said about reorganizing your partition strategies into other files. First, we’re going to create our partitions bucket. Partitions are really about how you expect your data to be organized in it’s raw form. For the NEOs api, it reports NEOs by Date, so it’s reasonable that we assume partitions in a daily manner. If your data changes once per month (or you only need to refresh it once per month), you could make them monthly! It’s really up to you.

import dagster as dg
from datetime import datetime
from defs.resources import NASANeoWsAPIResource, MinIOResource

# Define daily partitions starting from Jan 1, 2026
daily_partitions = dg.DailyPartitionsDefinition(start_date="2026-01-01")

Onto the asset definition - We’re going to write an aseet that, during it’s execution, calls to the API for the partition’s date (very important so you can re-run old partitions for idempotency) and then write it to our Minio storage account based on our partition strategy (year / month / day)

...
@dg.asset(
    partitions_def=daily_partitions,
    group_name="bronze",
    metadata={
        "source": "nasa_neos_api",
        "entity": "near_earth_objects",
        "description": "Near Earth asteroids approaching Earth for a specific day"
    }
)
def bronze_nasa_neos(
    context: dg.AssetExecutionContext, 
    nasa_neows_api: NASANeoWsAPIResource,
    minio: MinIOResource
) -> dict:
    """
    Fetch near earth object feed from NASA NeoWs API for a specific partition date.
    Returns asteroids approaching Earth on the partition date.
    """
    # Get the partition date
    partition_date = context.partition_key
    
    context.log.info(f"Fetching NEO feed for {partition_date}...")
    
    # Fetch data for the specific date
    data = nasa_neows_api.get_feed(
        start_date=partition_date,
        end_date=partition_date
    )
    
    # Extract objects for this date
    near_earth_objects = data.get('near_earth_objects', {}).get(partition_date, [])
    
    # Count potentially hazardous asteroids
    hazardous_count = sum(1 for obj in near_earth_objects if obj.get('is_potentially_hazardous_asteroid'))
    
    context.log.info(f"Found {len(near_earth_objects)} near earth objects on {partition_date}")
    context.log.info(f"{hazardous_count} are potentially hazardous")
    
    # Prepare result
    result = {
        "partition_date": partition_date,
        "element_count": len(near_earth_objects),
        "potentially_hazardous": hazardous_count,
        "data": near_earth_objects
    }
    
    # Write to MinIO bronze layer
    # Parse partition date for timestamp
    try:
        context.log.info("About to write to MinIO...")
        partition_dt = datetime.strptime(partition_date, "%Y-%m-%d")
        context.log.info(f"Parsed partition date: {partition_dt}")
        bucket="bronze"
        key=f"nasa_neows_api/{partition_dt.strftime('%Y/%m/%d')}/{partition_dt.strftime('%H%M%S')}.json"
        minio.write_json(
            bucket=bucket,
            key=key,
            data=near_earth_objects,
        )
        context.log.info(f"Wrote data to MinIO: s3://bronze/{key}")
    except Exception as e:
        context.log.error(f"Failed to write to MinIO: {type(e).__name__}: {str(e)}")
        import traceback
        context.log.error(traceback.format_exc())
    
    return result

We’ll update our definitions.py again to include our assets

from dagster import Definitions
from defs.assets import bronze_nasa_neos
from defs.resources import minio_resource, nasa_neows_api

defs = Definitions(
    assets=[bronze_nasa_neos],
    resources={"minio": minio_resource, "nasa_neows_api": nasa_neows_api},
)

Now to make this all show up in our previous Docker setup, we need to make sure to add the grpc_server to the workspace,yaml file identified in our previous part

load_from:
  - grpc_server:
      host: docker_nasa_user_code
      port: 4000
      location_name: nasaproject

And last but not least, we’re going to need to add our code location to docker compose so that the container will load and integrate with Dagster. Create a new Dockerfile called Docker_code_location_2 in your project root

FROM python:3.12-slim

RUN pip install \
    dagster \
    dagster-postgres \
    dagster-docker \
    dagster-aws \
    boto3 \
    python-dotenv \
    requests \
    psycopg2-binary

# Set up the working directory
WORKDIR /opt/dagster/app

# Copy the entire project including pyproject.toml
COPY dagsterCode/nasaproject /opt/dagster/app/nasaproject

# Set working directory to the project root
WORKDIR /opt/dagster/app/nasaproject

# Install project dependencies from pyproject.toml
RUN pip install -e .

# Set PYTHONPATH so demoproject can be imported
ENV PYTHONPATH=/opt/dagster/app/nasaproject/src

# Run dagster code server on port 4000
EXPOSE 4000

HEALTHCHECK --timeout=1s --start-period=3s --interval=3s --retries=20 CMD ["dagster", "api", "grpc-health-check", "-p", "4000"]

# CMD allows this to be overridden from run launchers or executors to execute runs and steps
CMD ["dagster", "code-server", "start", "-h", "0.0.0.0", "-p", "4000", "-m", "definitions"]

I found myself sharing a common set of environment variables across my docker containers so I abstracted them away into a sharable block in my docker compose. Add the env config to the top of your docker_compose.yaml file so you can share the environment (*dagster-env) across a bunch of containers. Maybe this isn’t best practice, but this is a PoC, not prod! This config will read your environment variables from the .env file mentinoed previously.

x-dagster-env: &dagster-env
  DAGSTER_POSTGRES_USER: ${POSTGRES_USER}
  DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASS}
  DAGSTER_POSTGRES_DB: ${POSTGRES_DB}
  NASA_API_KEY: ${NASA_API_KEY}
  MINIO_ENDPOINT_URL: http://minio:9000
  MINIO_ACCESS_KEY: ${MINIO_ACCESS_KEY}
  MINIO_SECRET_KEY: ${MINIO_SECRET_KEY}
  POSTGRES_HOST: docker_example_postgresql
  POSTGRES_PORT: 5432
  POSTGRES_DB: ${POSTGRES_DB}
  POSTGRES_USER: ${POSTGRES_USER}
  POSTGRES_PASSWORD: ${POSTGRES_PASS}
  DL_POSTGRES_HOST: ${DL_POSTGRES_HOST}
  DL_POSTGRES_PORT: ${DL_POSTGRES_PORT}
  DL_POSTGRES_DB: ${DL_POSTGRES_DB}
  DL_POSTGRES_USER: ${DL_POSTGRES_USER}
  DL_POSTGRES_PASS: ${DL_POSTGRES_PASS}
  ICEBERG_CATALOG_URI: ${ICEBERG_CATALOG_URI}
services:
    ...
    docker_nasa_user_code:
        build:
        context: .
        dockerfile: ./Dockerfile_code_location_2
        container_name: docker_nasaproject_code
        image: docker_nasaproject_code_image
        restart: always
        environment:
            <<: *dagster-env
            DAGSTER_CURRENT_IMAGE: "docker_nasaproject_code_image"
        expose:
            - "4000"
        networks:
            - docker_example_network
        depends_on:
            db_migrations:
                condition: service_completed_successfully

Now, if the coding gods are with us, running docker compose up --build will rebuild our stack and we’re going to see the new project set up in Dagster! Once it’s done, go to localhostL:3000 and go to deployments, click nasaproject, assets, bronze, and you will see our asset there!

Dagster Deployment Information

Click into the asset and press the materialize button in the top right - to test, just run it for the latest partition and press launch 1 run. Click on the little dialog that pops up in the bottom-right to see the results

Dagster Run Info

Success! You can go to your minio instance, select your brozne bucket, and click into nasa_neows_api/2026/01/18 or whatever your partition is that you ran, and you’ll see that we have a json file called 000000.json that is the output of our pipeline!

We’ve successfully implemented one asset in Dagster! Now, I wouldn’t recommend any big data analysis off of this one file, so that’s where we’re going to leave it for now. But in the next part, we’re going to start moving the bronze data to silver and eventually to gold so we can build some neat dashboards!

Bonus!

You can create schedules and jobs for the partition if you’re interested. I’m going to show you how to do it, though in my particular use case, I won’t be leaving my dev machine on all the time so the schedule is not likely to be exciting, though I do believe it is resilient and will ‘kick off’ the next time your environment spins up, which could be cool!

In assets.py, at the bottom, add the following lines to create a schedule, then a job for that schedule that’s aligned with the partition configuration

...
# Define the asset job
neos_partitions_job = dg.define_asset_job("partitioned_job", selection=[bronze_nasa_neos])

# This schedule will run daily
asset_partitioned_schedule = dg.build_schedule_from_partitioned_job(
    neos_partitions_job,
)

And finally, add it to your definitions.py

from dagster import Definitions
from defs.assets import bronze_nasa_neos, asset_partitioned_schedule
from defs.resources import minio_resource, nasa_neows_api

defs = Definitions(
    assets=[bronze_nasa_neos],
    resources={"minio": minio_resource, "nasa_neows_api": nasa_neows_api},
    schedules=[asset_partitioned_schedule],
)

Restart your environment with docker compose up --build and navigate to the new navigation items in your side menu! Make sure to turn the schedule on, as it is off by default.

Dagster Schedule for Partition

And that’s it! Hopefully I’ll get to Part #4 in less than a year’s time :)