Sitemap

Ingesting large CSV files into BigQuery via Cloud Run Job

13 min readMar 17, 2025

I have published articles regarding Java and Spring Boot on this platform. However, I came up with a fascinating and different topic this time. Recently, I started a new job as a Data Engineer. As a team, we are working on Big Data in GCP using Big Query and other GCP (Google Cloud Platform) tools. After this article, I will start to talk about Spring Cloud and GCP (Google Cloud Platform).

Throughout this article, we will focus on ingesting large CSV files to Big Query in a generic way. Let’s refresh our coffees and get started ☕ 📰😊.

Story Time

Yesterday, Isengard one of the largest e-companies in Middle-earth, faced increasing data management problems with the growing volume of business. CSV files sent by customers and suppliers came in different formats and data types. These files had to be manually saved into the company’s existing systems. This process was both time-consuming and error. In particular, problems such as misinterpretation of data types, incomplete data entries and format incompatibilities were seriously reducing the company’s operational efficiency.

Saruman calls the people to the data automation fight

(Saruman) who worked in the Data Integration department initiated a comprehensive automation project to solve these problems. And he convinced people with a fantastic speech to implement this solution. As part of this project, a system was designed to automatically process CSV files from customers and suppliers as soon as they are uploaded to Cloud Storage. This system was integrated with a service called Cloud Run Job. The Cloud Run Job is automatically triggered when the files are uploaded, and the schema of the files is verified using the Pydantic library and Data Contract. In this way, the format and type of data is automatically checked, preventing incorrect data from entering the system.

The validated data is then transferred to a data analysis platform called BigQuery. BigQuery is a fully managed data warehouse service of Google Cloud. This platform allows Isengard to analyse large amounts of data quickly and effectively. In addition to storing data, BigQuery offers functions such as analysing data with SQL-based queries, reporting and preparing data for machine learning models.

Why Automation?

Reducing Error Rate: Errors made during manual data entry have been largely eliminated with automation. Tools such as Pydantic automatically validate data types and formats, preventing incorrect data entry.

Time Saving: Manual processes are time-consuming, especially when dealing with large data sets. Automation accelerated this process, enabling data to be processed and analysed instantly.

Scalability: As the company grew, so did the volume of data. Automation offered a scalable solution to manage this increasing data volume. Cloud-based solutions allowed resources to be automatically increased when needed.

Data Integrity and Security: Automation enabled data to be processed more securely. Services such as Cloud Storage and BigQuery automatically provided security measures such as encryption and backup of data.

Design in GCP

Design

Let’s talk about the design and components.

During this process, we have five crucial components in GCP: cloud Storage, Eventarc, Workflow, Cloud Run Job, and Big Query. Let's break out these components step by step.

Why Eventarc?

When a CSV file is uploaded to Cloud Storage, Eventarc detects this event and trigger a workflow. Eventarc is a modern solution for event-driven architectures in GCP. When a file is created in Cloud Storage (for example, the storage.objects.create event), it can listen for this event and trigger another service (for example Workflow or Pub/Sub).

Why Workflow?

Once triggered by Eventarc, coordinate the workflow and run the Cloud Run Job as required.

Workflow allows you to combine multiple steps (e.g. file check, run Cloud Run Job, retry in case of error) and execute them in a sequential or conditional manner. Since you want a generic solution to handle 30 different CSV files, Workflow can loop through these files, call the Cloud Run Job for each one, and provide additional features such as parallel execution or error handling if required.

Additionally, Pub/Sub (Eventarc creates a pubsub topic) sets an acknowledgement time limit of 10 minutes for messages, which can be problematic for long-running jobs (e.g. processing a 1 GB CSV). If you triggered a Cloud Run Job directly from Pub/Sub and the job took longer than 10 minutes, the message could be queued again. Workflow is ideal to overcome this kind of limitation.

Basic Workflow

Check File Regex -> Log Input Data -> Send Notification -> Triggers Cloud Run Job -> Send Success Notification -> Finish (Retry, if it fails)

Why Cloud Run Job (Why Not Cloud Run?)

Cloud Run Job is designed for one-off tasks that run for a certain period of time. In your case, processing each CSV file is a ‘batch’ operation and does not need to run like a continuous web service (Cloud Run). While Cloud Run is a service that responds to HTTP requests, Cloud Run Job has the logic of a job that executes a command and finishes.

We will focus what batch job is in later articles on Spring Boot.

Data Contract

Another really important topic for processing large files is Data contracts. I will not go into detail about what the data contract is because their website provides a clear explanation. see

Reasons:

1-) Processing 30 CSV files, each with a different schema, is a difficult task. The Data Contract CLI allows you to define a data contract for each file. These contracts can specify data schemas (e.g. field names, data types, mandatory fields) and quality requirements (e.g. number of rows, null value checking) in YAML format. This enables centralised documentation and standardisation of the structure of each file.

2-) The Schema of CSV files could change in the future, and the code itself and Big Query schema also need to be adapted to these changes. In this case, the Data Contract come into play by regenerating the required Pydantic Object (our entities) and BigQuery table schema

Why Pydantic Library

The reasons why Pydantic should be preferred in this scenario (processing 30 different CSV files and saving them to BigQuery) and why Pydantic objects should be used can be evaluated in terms of both technical advantages and practical benefits.

Reasons:

1-) Data from CSV files can often be unstructured or semi-structured. Each row may have different data types (string, integer, float, date, etc.) and this data may be incorrect (for example, a negative value in an ‘age’ column or an invalid format in an ‘email’ column). Pydantic automatically validates this data using Python type hints. Basically, the data needs to be validated by Pydantic Library.

2-) It is easy to generate with required fields based on the data contract YAML file, so we don’t have to add new fields into the object manually (development time saving). see

Coding Time

We already knew what components we needed and what we needed to do. However, we don’t know how to create and bind these resources: Bucket, Eventarc, Worklfow, and Cloud Run Job. In these typical cases, Terraform gives us a big playground to create and manage our resources. I am also not going to go deep about Terraform and its syntax because it is not an issue for this topic. You can find more information by clicking here.

Terraform Configuration.

You need to enable Cloud-resource Manager API, Cloud Run API, Cloud Storage API, EventArc API and Workflow API. Otherwise, terraform apply will be not applied

terraform {
required_version = ">= 1.0.0"

required_providers {
google = {
source = "hashicorp/google"
version = "~> 4.0"
}
}
}

data "google_project" "project" {
project_id = "beratyesbek-test" #please do not forget replace your project ID
}

provider "google" {
credentials = file("beratyesbek-test-188958b60b87.json") # your service account
project = "beratyesbek-test"
region = "europe-west1"
}

resource "google_storage_bucket" "large-csv-file-bucket" {
location = "europe-west1"
name = "large-csv-file-bucket"
project = "beratyesbek-test"

}

# This workflow will be run when a CSV file is uploaded.
resource "google_workflows_workflow" "large-csv-file-workflow" {
name = "large-csv-file-workflow"
region = "europe-west1"
# replace your own service account
service_account = "beratyesbek-test@beratyesbek-test.iam.gserviceaccount.com"
source_contents = file("workflows/large-csv-file-workflow.yaml")
}

# GCS needs to be allowed to issue Pub/Sub messages when the file is uploaded.
# Otherwise terraform apply might throw an error
resource "google_project_iam_member" "gcs_service_account_pubsub_publisher" {
project = "beratyesbek-test"
role = "roles/pubsub.publisher"
member = "serviceAccount:@gs-project-accounts.iam.gserviceaccount.com">service-${data.google_project.project.number}@gs-project-accounts.iam.gserviceaccount.com"
}

# Eventarc triggers the relevant workflow by listening file upload events on GCS.
resource "google_eventarc_trigger" "large-csv-file-trigger" {
location = "europe-west1"
name = "large-csv-file-trigger"
project = "beratyesbek-test"
# replace your own service account
service_account = "beratyesbek-test@beratyesbek-test.iam.gserviceaccount.com"

matching_criteria {
attribute = "type"
value = "google.cloud.storage.object.v1.finalized"
}

matching_criteria {
attribute = "bucket"
value = google_storage_bucket.large-csv-file-bucket.name
}

destination {
workflow = google_workflows_workflow.large-csv-file-workflow.name
}

depends_on = [google_project_iam_member.gcs_service_account_pubsub_publisher]

}

# Creating a Cloud Run job to upload the CSV file to BigQuery.
resource "google_cloud_run_v2_job" "cr-large-csv-file-job" {
location = "europe-west1"
name = "cr-large-csv-file-job"
project = "beratyesbek-test"

template {
template {
# replace your own service account
service_account = "beratyesbek-test@beratyesbek-test.iam.gserviceaccount.com"
containers {
image = "europe-west1-docker.pkg.dev/beratyesbek-test/berat-repository/big-query-csv-loader:1"
}
}

}
}

You need to apply Terraform.

terraform init
terraform plan
terraform apply

Workflow Yaml File:

main:
params: [event]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- event_bucket: ${event.data.bucket}
- event_file: ${event.data.name}
- target_bucket: ${"input-" + project_id}
- job_name: cr-large-csv-file-job # job name
- job_location: europe-west1 # replace your region
- run_job:
call: googleapis.run.v1.namespaces.jobs.run
args:
name: ${"namespaces/" + project_id + "/jobs/" + job_name}
location: ${job_location}
body:
overrides:
containerOverrides:
env:
- name: INPUT_BUCKET
value: ${event_bucket}
- name: INPUT_FILE
value: ${event_file}
result: job_execution
- finish:
return: ${job_execution}
example file structure

This Terraform code describes an infrastructure that automates the processing of large CSV files on Google Cloud. The system includes a workflow that is automatically triggered when a CSV file is uploaded to Google Cloud Storage (GCS). This process allows the file to be uploaded to BigQuery using Google Cloud Workflows, Eventarc and Cloud Run components.

Conclusion For Terraform

This Terraform code integrates GCS, Workflows, Eventarc and Cloud Run services to enable automated processing of large CSV files in the Google Cloud environment. The operation of the system can be summarised as follows:

  • A CSV file is uploaded to a GCS bucket.
  • Eventarc trigger detects the file upload event and runs the Workflows workflow.
  • Workflows processes the file by calling the corresponding Cloud Run job.
  • The Cloud Run container receives the CSV file and uploads it to BigQuery.

Python Code and Data Contract

As mentioned earlier, a Data Contract is an agreement between the parties exchanging data that specifies the data format, structure and validation rules. In this scenario:

- Sender: The system providing the CSV file
- Receiver: Pydantic object (for data validation and processing) and BigQuery (data storage destination)

We will create a one-time data contract for each incoming CSV file. In accordance with this contract:

1. Pydantic Object will be created by Data Contract CLI generator to check the structure and correctness of the data in the CSV file.
2. BigQuery Schema will be created by Data Contract CLI to store the data in a format suitable for BigQuery.

This process is a critical step to ensure the consistency and reliability of the data.
You need to install Datacontrat CLI.

If you go to the GitHub source code, you will see the users.csv file as an example. We need to generate a data contract.yaml file based on a CSV file. See the command below:

datacontract import --format csv --source users.csv --output datacontract.yaml

When you run this command, you will see this output

dataContractSpecification: 1.1.0
id: beratyesbek-datacontract
info:
title: Berat Data Contract
description: A data contract defining the structure and constraints for user data stored in a CSV file.
version: 0.0.1
servers:
production:
type: bigquery
project: beratyesbek-test
dataset: users_dataset
table: users
models:
users:
description: A table representing user information stored in a CSV file with ASCII encoding.
type: table
title: Users
fields:
username:
type: string
description: The unique username of the user.
required: true
firstname:
type: string
description: The user's first name.
required: true
lastname:
type: string
description: The user's last name.
required: true
language:
type: string
description: The preferred language of the user (e.g., 'en', 'es', 'fr').
required: false
address_id:
type: integer
description: A foreign key referencing the unique identifier of the user's address in an external addresses table.
required: false
contact_id:
type: integer
description: A foreign key referencing the unique identifier of the user's contact details in an external contacts table.
required: false

Generating Big Query Schema based on Data Contract

datacontract export - format bigquery - server production - output users.json

After the generating Big Query Schema, you can create a table in related dataset.

We need another jsonschema to generate our Pydantic object:

datacontract export --format jsonschema --output users.json

After you run this command, you will get the users.json file, which help you to generate your Pydantic Object. We need an installation. see

pip install datamodel-code-generator

Generate pydantic model

datamodel-codegen  --input users.json --input-file-type jsonschema --output model.py

Generated User Model

# generated by datamodel-codegen:
# filename: users.json
# timestamp: 2025-03-16T10:29:59+00:00

from __future__ import annotations

from typing import Optional

from pydantic import BaseModel, Field


class Users(BaseModel):
username: str = Field(..., description='The unique username of the user.')
firstname: str = Field(..., description="The user's first name.")
lastname: str = Field(..., description="The user's last name.")
language: Optional[str] = Field(
None, description="The preferred language of the user (e.g., 'en', 'es', 'fr')."
)
address_id: Optional[int] = Field(
None,
description="A foreign key referencing the unique identifier of the user's address in an external addresses table.",
)
contact_id: Optional[int] = Field(
None,
description="A foreign key referencing the unique identifier of the user's contact details in an external contacts table.",
)

Explanation of Python Code

cloud run job steps

In this data ingestion pipeline, a Cloud Run service is triggered by a Google Cloud Workflow, which passes container values specifying the name and bucket of an incoming file stored in Cloud Storage. These container values act as parameters to identify the file’s location. The Cloud Run service then retrieves the file (a blob) from Cloud Storage and uses the smart_open library to access it. This library is chosen for its lightweight nature and ability to stream the file directly without loading it fully into memory, ensuring memory safety by preventing potential overflows or crashes, especially with large files. Once the file is opened, the data is processed in batches — rather than all at once — to efficiently ingest it into BigQuery. This batch processing approach optimizes resource usage and ensures reliable, scalable data transfer to BigQuery, where the data is ultimately stored for analysis.

Batch Processing

Batch’ refers to a data processing method. In this method, instead of processing data individually or in a continuous stream, data is brought together in a group or batch and processed as such. Especially when working with large amounts of data, instead of loading all the data into memory at once, it divides the data into smaller, manageable chunks and processes these chunks sequentially. For example, when importing millions of rows from a file into BigQuery, instead of sending them all at once, you send them in batches of 10,000 rows. This approach uses memory efficiently, improves performance, and prevents system overload. In short, batch processing provides an organised and controlled process flow by processing data in batches.

main.py

import os
from google.cloud import bigquery, storage
from smart_open import open
from model import Users
import logging
import csv
import itertools

PROJECT_ID = "beratyesbek-test"
DATASET_ID = "users_dataset"
TABLE_ID = "users"

def main():
bucket_name = os.environ.get("INPUT_BUCKET")
file_name = os.environ.get("INPUT_FILE")
start_process(bucket_name, file_name)


def start_process(bucket_name, file_name):
uri = f"gs://{bucket_name}/{file_name}"
with open(uri, "r") as blob_stream:
reader = csv.DictReader(blob_stream)
for batch in itertools.batched(reader, 10000):
process_batch(batch)
logging.info(f"Data loaded from {uri} to BigQuery table {PROJECT_ID}.{DATASET_ID}.{TABLE_ID}")

def process_batch(batch: tuple):
data_object = []
for row in batch:
object = Users.model_validate(row, strict=False)
data_object.append(object.model_dump())
save_bigquery(data_object)
data_object.clear()

def save_bigquery(objects):
client = bigquery.Client(project=PROJECT_ID)
table = client.get_table(f"{DATASET_ID}.{TABLE_ID}")
client.insert_rows_json(table, objects)
logging.info(f"Data loaded to BigQuery table {PROJECT_ID}.{DATASET_ID}.{TABLE_ID}")

if __name__ == '__main__':
main()

model.py:

# generated by datamodel-codegen:
# filename: users.json
# timestamp: 2025-03-16T10:29:59+00:00

from __future__ import annotations

from typing import Optional

from pydantic import BaseModel, Field, model_validator


class Users(BaseModel):
username: str = Field(..., description='The unique username of the user.')
firstname: str = Field(..., description="The user's first name.")
lastname: str = Field(..., description="The user's last name.")
language: Optional[str] = Field(
None, description="The preferred language of the user (e.g., 'en', 'es', 'fr')."
)
address_id: Optional[int] = Field(
None,
description="A foreign key referencing the unique identifier of the user's address in an external addresses table.",
)
contact_id: Optional[int] = Field(
None,
description="A foreign key referencing the unique identifier of the user's contact details in an external contacts table.",
)

@model_validator(mode="before")
def handle_blank_strings(cls, data: dict) -> dict:
"""Convert empty strings to None for all fields"""
if isinstance(data, dict):
for key, value in data.items():
if value == "":
data[key] = None
return data

When you manually upload a file to a Google Cloud Storage bucket, Eventarc automatically detects the upload event. This triggers a predefined workflow, which in turn activates a Cloud Run Job. The Cloud Run Job then processes the uploaded file and saves the data to BigQuery.

Bucket
Workflow
Cloud Run Job

Big Query Result

https://github.com/BeratYesbek/bigquery-ingesting-large-files

This article is intended to offer a perspective, not to teach :)))

--

--

Berat Yesbek
Berat Yesbek

No responses yet