about analyzing crime tendencies in your native space. You understand that related knowledge exists, and you’ve got some primary analytical expertise that you need to use to research this knowledge. Nevertheless, this knowledge is altering regularly, and also you wish to preserve your evaluation up to date with the newest crime incidents with out repeating your evaluation. How can we automate this course of?
Properly, should you’ve stumbled upon this text, you’re in luck! Collectively, we’ll stroll via learn how to create an information pipeline to extract native police log knowledge, and join this to a visualization platform to look at native crime tendencies over time. For this text, we’ll extract knowledge on incidents reported to the Cambridge (MA) Police Division (CPD), after which visualize this knowledge as a dashboard in Metabase.

Moreover, this text can function a normal template for anyone seeking to write ETL pipelines orchestrated in Prefect, and/or anyone who desires to attach Metabase to their knowledge shops to create insightful analyses/stories.
Observe: I’ve no affiliation with Metabase – we’ll merely use Metabase for example platform to create our closing dashboard. There are a lot of different viable alternate options, that are described on this part.
Contents:
Background Data
Earlier than we dive into the pipeline, it’ll be useful to overview the next ideas, or preserve these hyperlinks as reference as you learn.
Information of Curiosity
The info we’ll be working with incorporates a group of police log entries, the place every entry is a single incident reported to/by the CPD. Every entry incorporates complete data describing the incident, together with however not restricted to:
- Date & time of the incident
- Sort of incident that occurred
- The road the place the incident befell
- A plaintext description of what occurred

Try the portal for extra details about the info.
For monitoring crime tendencies in Cambridge, MA, creating an information pipeline to extract this knowledge is suitable, as the info is up to date every day (in accordance with their web site). If the info was up to date much less regularly (e.g. yearly), then creating an information pipeline to automate this course of wouldn’t save us a lot effort. We may merely revisit the info portal on the finish of every 12 months, obtain the .csv, and full our evaluation.
Now that we’ve discovered the suitable dataset, let’s stroll via the implementation.
ETL Pipeline
To go from uncooked CPD log knowledge to a Metabase dashboard, our mission will encompass the next main steps:
- Extract the info by utilizing its corresponding API.
- Reworking it to organize it for storage.
- Loading it right into a PostgreSQL database.
- Visualizing the info in Metabase.
The info circulate of our system will seem like the next:

Our pipeline follows an ETL workflow, which implies that we’ll remodel the info earlier than importing it into PostgreSQL. This requires loading knowledge into reminiscence whereas executing knowledge transformations, which can be problematic for giant datasets which can be too large to slot in reminiscence. On this case, we could contemplate an ELT workflow, the place we remodel the info in the identical infrastructure the place it’s saved. Since our dataset is small (<10k rows), this shouldn’t be an issue, and we’ll benefit from the truth that pandas makes knowledge transformation simple.
We’ll extract the CPD log knowledge by making a request for the dataset to the Socrata Open Information API. We’ll use sodapy — a python consumer for the API — to make the request.
We’ll encapsulate this extraction code in its personal file — extract.py.
import pandas as pd
from sodapy import Socrata
from dotenv import load_dotenv
import os
from prefect import job
@job(retries=3, retry_delay_seconds=[10, 10, 10]) # retry API request in case of failure
def extract_data():
'''
Extract incident knowledge reported to the Cambridge Police Division utilizing the Socrata Open Information API.
Return the incident knowledge as a Pandas DataFrame.
'''
# fetch Socrata app token from .env
# embrace this app token when interacting with the Socrata API to keep away from request throttling, so we will fetch all of the incidents
load_dotenv()
APP_TOKEN = os.getenv("SOCRATA_APP_TOKEN")
# create Socrata consumer to work together with the Socrata API (https://github.com/afeld/sodapy)
consumer = Socrata(
"knowledge.cambridgema.gov",
APP_TOKEN,
timeout=30 # enhance timeout from 10s default - generally, it takes longer to fetch all the outcomes
)
# fetch all knowledge, paginating over outcomes
DATASET_ID = "3gki-wyrb" # distinctive identifier for Cambridge Police Log knowledge (https://knowledge.cambridgema.gov/Public-Security/Each day-Police-Log/3gki-wyrb/about_data)
outcomes = consumer.get_all(DATASET_ID)
# Convert to pandas DataFrame
results_df = pd.DataFrame.from_records(outcomes)
return results_df
Notes concerning the code:
- Socrata throttles requests should you don’t embrace an app token that uniquely identifies your utility. To fetch all the outcomes, we’ll embrace this token in our request and put this in a .env file to maintain this out of our supply code.
- We’ll specify a 30 second timeout (as a substitute of the ten second default timeout) when making our request to the Socrata API. From expertise utilizing the API, fetching all the outcomes may generally take longer than 10 seconds, and 30 seconds was usually sufficient to keep away from timeout errors.
- We’ll load the fetched outcomes right into a pandas DataFrame, since we’ll validate and remodel this knowledge utilizing pandas.
ETL: Validate
Now, we’ll do some primary knowledge high quality checks on the info.
The info is already pretty clear (which is sensible because it’s offered by the Cambridge Police Division). So, our knowledge high quality checks will act extra as a “sanity test” that we didn’t ingest something sudden.
We’ll validate the next:
- All of the anticipated columns (as specified right here) are current.
- All IDs are numeric.
- Datetimes observe ISO 8601 format.
- There aren’t any lacking values in columns that ought to comprise knowledge. Particularly, every incident ought to have a Datetime, ID, Sort, and Location.
We’ll put this validation code in its personal file — validate.py.
from datetime import datetime
from collections import Counter
import pandas as pd
from prefect import job
### UTILITIES
def check_valid_schema(df):
'''
Verify whether or not the DataFrame content material incorporates the anticipated columns for the Cambridge Police dataset.
In any other case, increase an error.
'''
SCHEMA_COLS = ['date_time', 'id', 'type', 'subtype', 'location', 'last_updated', 'description']
if Counter(df.columns) != Counter(SCHEMA_COLS):
increase ValueError("Schema doesn't match with the anticipated schema.")
def check_numeric_id(df):
'''
Convert 'id' values to numeric.
If any 'id' values are non-numeric, change them with NaN, to allow them to be eliminated downstream within the knowledge transformations.
'''
df['id'] = pd.to_numeric(df['id'], errors='coerce')
return df
def verify_datetime(df):
'''
Confirm 'date_time' values observe ISO 8601 format (https://www.iso.org/iso-8601-date-and-time-format.html).
Increase a ValueError if any of the 'date_time' values are invalid.
'''
df.apply(lambda row: datetime.fromisoformat(row['date_time']), axis=1)
def check_missing_values(df):
'''
Verify whether or not there are any lacking values in columns that require knowledge.
For police logs, every incident ought to have a datetime, ID, incident sort, and site.
'''
REQUIRED_COLS = ['date_time', 'id', 'type', 'location']
for col in REQUIRED_COLS:
if df[col].isnull().sum() > 0:
increase ValueError(f"Lacking values are current within the '{col}' attribute.")
### VALIDATION LOGIC
@job
def validate_data(df):
'''
Verify the info satisfies the next knowledge high quality checks:
- schema is legitimate
- IDs are numeric
- datetime follows ISO 8601 format
- no lacking values in columns that require knowledge
'''
check_valid_schema(df)
df = check_numeric_id(df)
verify_datetime(df)
check_missing_values(df)
return df
When implementing these knowledge high quality checks, it’s necessary to consider learn how to deal with knowledge high quality checks that fail.
- Do we wish our pipeline to fail loudly (e.g. increase an error/crash)?
- Ought to our pipeline deal with failures silently? As an example, mark knowledge recognized to be invalid in order that it may be eliminated downstream?
We’ll increase an error if:
- The ingested knowledge doesn’t observe the anticipated schema. It doesn’t make sense to course of the info if it doesn’t comprise what we anticipate.
- Datetime doesn’t observe ISO 8601 format. There’s no customary method to convert incorrect datetime values to its corresponding appropriate datetime format.
- The incident incorporates lacking values for any considered one of datetime, ID, sort, and site. With out these values, the incident can’t be described comprehensively.
For data which have non-numeric IDs, we’ll fill them with NaN placeholders after which take away them downstream within the transformation step. These data don’t break our evaluation if we merely take away them.
ETL: Rework
Now, we’ll do some transformations on our knowledge to organize it for storage in PostgreSQL.
We’ll do the next transformations:
- Take away duplicate rows — we’ll use the ‘ID’ column to establish duplicates.
- Take away invalid rows — among the rows that failed the info high quality checks have been marked with an NaN ‘ID’, so we’ll take away these.
- Cut up the datetime column into separate 12 months, month, day, and time columns. In our closing evaluation, we could wish to analyze crime tendencies by these totally different time intervals, so we’ll create these further columns right here to simplify our queries downstream.
We’ll put this transformation code in its personal file — remodel.py.
import pandas as pd
from prefect import job
### UTILITIES
def remove_duplicates(df):
'''
Take away duplicate rows from dataframe primarily based on 'id' column. Preserve the primary incidence.
'''
return df.drop_duplicates(subset=["id"], preserve='first')
def remove_invalid_rows(df):
'''
Take away rows the place the 'id' is NaN, as these IDs have been recognized as non-numeric.
'''
return df.dropna(subset='id')
def split_datetime(df):
'''
Cut up the date_time column into separate 12 months, month, day, and time columns.
'''
# convert to datetime
df['date_time'] = pd.to_datetime(df['date_time'])
# extract 12 months/month/day/time
df['year'] = df['date_time'].dt.12 months
df['month'] = df['date_time'].dt.month
df['day'] = df['date_time'].dt.day
df['hour'] = df['date_time'].dt.hour
df['minute'] = df['date_time'].dt.minute
df['second'] = df['date_time'].dt.second
return df
### TRANSFORMATION LOGIC
@job
def transform_data(df):
'''
Apply the next transformations to the handed in dataframe:
- deduplicate data (preserve the primary)
- take away invalid rows
- break up datetime into 12 months, month, day, and time columns
'''
df = remove_duplicates(df)
df = remove_invalid_rows(df)
df = split_datetime(df)
return df
ETL: Load
Now our knowledge is able to import into into PostgreSQL.
Earlier than we will import our knowledge, we have to create our PostgreSQL occasion. We’ll create one regionally utilizing a compose file. This file permits us to outline & configure all of the companies that our utility wants.
companies:
postgres_cpd: # postgres occasion for CPD ETL pipeline
picture: postgres:16
container_name: postgres_cpd_dev
atmosphere:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: cpd_db
ports:
- "5433:5432" # Postgres is already on port 5432 on my native machine
volumes:
- pgdata_cpd:/var/lib/postgresql/knowledge
restart: unless-stopped
pgadmin:
picture: dpage/pgadmin4
container_name: pgadmin_dev
atmosphere:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
ports:
- "8081:80"
depends_on: # do not begin pg_admin till our postgres occasion is working
- postgres_cpd
volumes:
pgdata_cpd: # all knowledge for our postgres_cpd service shall be saved right here
There are two primary companies outlined right here:
- postgres_cpd — That is our PostgreSQL occasion the place we’ll retailer our knowledge.
- pgadmin —DB admin platform which supplies a GUI we will use to question knowledge in our PostgreSQL database. Not functionally required, however helpful for checking the info in our database. For extra data on connecting to your PostgreSQL database in pgAdmin, click on right here.
Let’s spotlight some necessary configuration for our postgres_cpd service:
- container_name: postgres_cpd_dev -> Our service will run in a container (i.e. an remoted course of) named postgres_cpd_dev. Docker generates random container names should you don’t specify this, so assigning a reputation will make it extra simple to work together with the container.
- atmosphere: -> We create a Postgres person from credentials saved in our .env file. Moreover, we create a default database, cpd_dev.
- ports: -> Our PostgreSQL service will hear on port 5432 inside the container. Nevertheless, we’ll map port 5433 on the host machine to port 5432 within the container, permitting us to connect with PostgreSQL from our host machine by way of port 5433.
- volumes: -> Our service will retailer all its knowledge (e.g. configuration, knowledge recordsdata) beneath the next listing inside the container: /var/lib/postgresql/knowledge. We’ll mount this container listing to a named Docker quantity saved on our native machine, pgdata_cpd. This permits us to persist the database knowledge past the lifetime of the container.
Now that we’ve created our PostgreSQL occasion, we will execute queries in opposition to it. Importing our knowledge into PostgreSQL requires executing two queries in opposition to the database:
- Creating the desk that may retailer the info.
- Loading our reworked knowledge into that desk.
Every time we execute a question in opposition to our PostgreSQL occasion, we have to do the next:
- Set up our connection to PostgreSQL.
- Execute the question.
- Commit the adjustments & shut the connection.
from prefect import job
from sqlalchemy import create_engine
import psycopg2
from dotenv import load_dotenv
import os
# learn content material from .env, which incorporates our Postgres credentials
load_dotenv()
def create_postgres_table():
'''
Create the cpd_incidents desk in Postgres DB (cpd_db) if it would not exist.
'''
# set up connection to DB
conn = psycopg2.join(
host="localhost",
port="5433",
database="cpd_db",
person=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD")
)
# create cursor object to execute SQL
cur = conn.cursor()
# execute question to create the desk
create_table_query = '''
CREATE TABLE IF NOT EXISTS cpd_incidents (
date_time TIMESTAMP,
id INTEGER PRIMARY KEY,
sort TEXT,
subtype TEXT,
location TEXT,
description TEXT,
last_updated TIMESTAMP,
12 months INTEGER,
month INTEGER,
day INTEGER,
hour INTEGER,
minute INTEGER,
second INTEGER
)
'''
cur.execute(create_table_query)
# commit adjustments
conn.commit()
# shut cursor and connection
cur.shut()
conn.shut()
@job
def load_into_postgres(df):
'''
Masses the reworked knowledge handed in as a DataFrame
into the 'cpd_incidents' desk in our Postgres occasion.
'''
# create desk to insert knowledge into as needed
create_postgres_table()
# create Engine object to connect with DB
engine = create_engine(f"postgresql://{os.getenv("POSTGRES_USER")}:{os.getenv("POSTGRES_PASSWORD")}@localhost:5433/cpd_db")
# insert knowledge into Postgres DB into the 'cpd_incidents' desk
df.to_sql('cpd_incidents', engine, if_exists='change')
Issues to notice concerning the code above:
- Much like how we fetched our app token for extracting our knowledge, we’ll fetch our Postgres credentials from a .env file.
- To load the DataFrame containing our reworked knowledge into Postgres, we’ll use the pandas.DataFrame.to_sql(). It’s a easy method to insert DataFrame knowledge into any database supported by SQLAlchemy.
Defining the Information Pipeline
We’ve applied the person elements of the ETL course of. Now, we’re able to encapsulate these elements right into a pipeline.
There are a lot of instruments obtainable to make use of for orchestrating pipelines outlined in python. Two widespread choices are Apache Airflow and Prefect.
For it’s simplicity, we’ll proceed with defining our pipeline utilizing Prefect. We have to do the next to get began:
- Set up Prefect in our growth atmosphere.
- Get a Prefect API server. Since we don’t wish to handle our personal infrastructure to run Prefect, we’ll join for Prefect Cloud.
For extra data on Prefect setup, try the docs.
Subsequent, we should add the next decorators to our code:
- @job -> Add this to every perform that implements a element of our ETL pipeline (i.e. our extract, validate, remodel, and cargo features).
- @circulate -> Add this decorator to the perform that encapsulates the ETL elements into an executable pipeline.
In the event you look again at our extract, validate, remodel, and cargo code, you’ll see that we added the @job decorator to those features.
Now, let’s outline our ETL pipeline that executes these duties. We’ll put the next in a separate file, etl_pipeline.py.
from extract import extract_data
from validate import validate_data
from remodel import transform_data
from load import load_into_postgres
from prefect import circulate
@circulate(title="cpd_incident_etl", log_prints=True) # Our pipeline will seem as 'cpd_incident_etl' within the Prefect UI. All print outputs shall be displayed in Prefect.
def etl():
'''
Execute the ETL pipeline:
- Extract CPD incident knowledge from the Socrata API
- Validate and remodel the extracted knowledge to organize it for storage
- Import the reworked knowledge into Postgres
'''
print("Extracting knowledge...")
extracted_df = extract_data()
print("Performing knowledge high quality checks...")
validated_df = validate_data(extracted_df)
print("Performing knowledge transformations...")
transformed_df = transform_data(validated_df)
print("Importing knowledge into Postgres...")
load_into_postgres(transformed_df)
print("ETL full!")
if __name__ == "__main__":
# CPD knowledge is anticipated to be up to date every day (https://knowledge.cambridgema.gov/Public-Security/Each day-Police-Log/3gki-wyrb/about_data)
# Thus, we'll execute our pipeline every day (at midnight)
etl.serve(title="cpd-pipeline-deployment", cron="0 0 * * *")
Issues to notice concerning the code:
- @circulate(title=”cpd_incident_etl”, log_prints=True) -> this names our pipeline “cpd_incident_etl”, which shall be mirrored within the Prefect UI. The output of all our print statements shall be logged in Prefect.
- etl.serve(title=”cpd-pipeline-deployment”, cron=”0 0 * * *”) -> this creates a deployment of our pipeline, named “cpd-pipeline-deployment”, that runs day by day at midnight.


Now that we’ve created our pipeline to load our knowledge into PostgreSQL, it’s time to visualise it.
There are a lot of approaches we may take to visualise our knowledge. Some notable choices embrace:
Each are good choices. With out going into an excessive amount of element behind every BI instrument, we’ll use Metabase to make a easy dashboard.
- Metabase is an open-source BI and embedded analytics instrument that makes knowledge visualization and evaluation easy.
- Connecting Metabase to our knowledge sources and deploying it’s simple, in comparison with different BI instruments (ex: Apache Superset).
Sooner or later, if we wish to have extra customization over our visuals/stories, we will think about using different instruments. For now, Metabase will do for making a POC.
Metabase lets you select between utilizing its cloud model or managing a self-hosted occasion. Metabase Cloud offeres a number of fee plans, however you may create a self-hosted occasion of Metabase without cost utilizing Docker. We’ll outline our Metabase occasion in our compose file.
- Since we’re self-hosting, we additionally should outline the Metabase utility database, which incorporates the metadata that Metabase wants to question your knowledge sources (in our case, postgres_cpd).
companies:
postgres_cpd: # postgres occasion for CPD ETL pipeline
picture: postgres:16
container_name: postgres_cpd_dev
atmosphere:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: cpd_db
ports:
- "5433:5432" # Postgres is already on port 5432 on my native machine
volumes:
- pgdata_cpd:/var/lib/postgresql/knowledge
restart: unless-stopped
networks:
- metanet1
pgadmin:
picture: dpage/pgadmin4
container_name: pgadmin_dev
atmosphere:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
ports:
- "8081:80"
depends_on:
- postgres_cpd
networks:
- metanet1
metabase: # taken from https://www.metabase.com/docs/newest/installation-and-operation/running-metabase-on-docker
picture: metabase/metabase:newest
container_name: metabase
hostname: metabase
volumes:
- /dev/urandom:/dev/random:ro
ports:
- "3000:3000"
atmosphere:
MB_DB_TYPE: postgres
MB_DB_DBNAME: metabaseappdb
MB_DB_PORT: 5432
MB_DB_USER: ${METABASE_DB_USER}
MB_DB_PASS: ${METABASE_DB_PASSWORD}
MB_DB_HOST: postgres_metabase # should match container title of postgres_mb (Metabase Postgres occasion)
networks:
- metanet1
healthcheck:
check: curl --fail -I http://localhost:3000/api/well being || exit 1
interval: 15s
timeout: 5s
retries: 5
postgres_mb: # postgres occasion for managing Metabase occasion
picture: postgres:16
container_name: postgres_metabase # different companies should use this title to speak with this container
hostname: postgres_metabase # inner identifier, would not influence communication with different companies (useful for logs)
atmosphere:
POSTGRES_USER: ${METABASE_DB_USER}
POSTGRES_DB: metabaseappdb
POSTGRES_PASSWORD: ${METABASE_DB_PASSWORD}
ports:
- "5434:5432"
volumes:
- pgdata_mb:/var/lib/postgresql/knowledge
networks:
- metanet1
# Right here, we'll outline separate volumes to isolate DB configuration & knowledge recordsdata for every Postgres database.
# Our Postgres DB for our utility ought to retailer its config/knowledge individually from the Postgres DB our Metabase service depends on.
volumes:
pgdata_cpd:
pgdata_mb:
# outline the community over which all of the companies will talk
networks:
metanet1:
driver: bridge # TO DO: 'bridge' is the default community - companies will be capable of talk with one another utilizing their service names
To create our Metabase occasion, we made the next adjustments to our compose file:
- Added two companies: metabase (our Metabase occasion) and postgres_mb (our Metabase occasion’s utility database).
- Outlined an extra quantity, pgdata_mb. It will retailer the info for the Metabase utility database (postgres_mb).
- Outlined the community over which the companies will talk, metanet1.
With out going into an excessive amount of element, let’s break down the metabase and postgres_mb companies.
Our Metabase occasion (metabase):
- This service shall be uncovered on port 3000 on the host machine and inside the container. If we’re working this service on our native machine, we’ll be capable of entry it at localhost:3000.
- We join Metabase to it’s utility database by guaranteeing that the MB_DB_HOST, MB_DB_PORT, and MB_DB_NAME atmosphere variables match up with the container title, ports, and database title listed beneath the postgres_mb service.
For extra data on learn how to run Metabase in Docker, try the docs.
After organising Metabase, you’ll be prompted to attach Metabase to your knowledge supply.

After choosing a PostgreSQL knowledge supply, we will specify the next connection string to attach Metabase to our PostgreSQL occasion, substituting your credentials as needed:
postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@postgres_cpd:5432/cpd_db

After organising the connection, we will create our dashboard. You’ll be able to create a vast number of visuals in Metabase, so we received’t go into the specifics right here.
Let’s revisit the instance dashboard that we displayed initially of this put up. This dashboard properly summarizes latest and historic tendencies in reported CPD incidents.

From this dashboard, we will see the next:
- Most incidents are reported to the CPD within the mid-late afternoon.
- An amazing majority of reported incidents are of the “INCIDENT” sort.
- The variety of reported incidents peaked round August-October of 2025, and has been reducing steadily ever since.
Fortunately for us, Metabase will question our database each time we load this dashboard, so we received’t have to fret about this dashboard displaying stale knowledge.
Try the Git repo right here if you wish to dive deeper into the implementation.
Wrap-up and Future Work
Thanks for studying! Let’s briefly recap what we constructed:
- We constructed an information pipeline to extract, remodel, and cargo Cambridge Police Log knowledge right into a self-hosted PostgreSQL database.
- We deployed this pipeline utilizing Prefect and scheduled it to run every day.
- We created a self-hosted occasion of Metabase, related it to our PostgreSQL database, and created a dashboard to visualise latest and historic crime tendencies in Cambridge, MA.
There are a lot of methods to construct upon this mission, together with however not restricted to:
- Creating further visualizations (geospatial heatmap) to visualise crime frequencies in several areas inside Cambridge. This might require remodeling our road location knowledge into latitude/longitude coordinates.
- Deploying our self-hosted pipeline and companies off of our native machine.
- Think about becoming a member of this knowledge with different datasets for insightful cross-domain evaluation. As an example, maybe we may be a part of this dataset to demographic/census knowledge (utilizing road location) to see whether or not areas of various demographic make-up inside Cambridge have totally different incident charges.
If in case you have some other concepts for learn how to lengthen upon this mission, otherwise you would’ve constructed issues in another way, I’d love to listen to it within the feedback!
The creator has created all photos on this article.
Sources & GitHub
Prefect:
Metabase:
Docker:
GitHub Repo:
CPD Each day Police Log Dataset:
















