Introduction to Cloud Pricing
I am looking forward to speaking at PyConLT 2025 in two weeks.
Its been a while (12 years!) since my last Python conference EuroPython Florence 2012, when I spoke as a Django web developer, although I did give a Golang talk at Kubecon USA last year.
I work at EDB, the Postgres company, on our Postgres AI product. The cloud version of which runs across the main cloud providers, AWS, Azure and GCP.
The team I am in handles the identity management and billing components of the product. So whilst I am mainly a Golang micro-service developer, I have dipped my toe into Data Science, having rewritten our Cloud prices ETL using Python & Airflow. The subject of my talk in Lithuania.
Cloud pricing can be surprisingly complex ... and the price lists are not small.
The full price lists for the 3 CSPs together are almost 5 million prices - known as SKUs (Stock Keeping Unit prices)
csp x service x type x tier x region
3 x 200 x 50 x 3 x 50 = 4.5 million
csp = AWS, Azure and GCP
service = vms, k8s, network, load balancer, storage etc.
type = e.g. storage - general purpose E2, N1 ... accelerated A1, A2 multiplied by various property sizes
tier = T-shirt size tiers of usage, ie more use = cheaper rate - small, medium, large
region = us-east-1, us-west-2, af-south-1, etc.
We need to gather all the latest service SKU that our Postgres AI may use and total them up as a cost estimate for when customers are selecting the various options for creating or adding to their installation.
Applying the additional pricing for our product and any private offer discounts for it, as part of this process.
Therefore we needed to build a data pipeline to gather the SKUs and keep them current.
Previously we used a 3rd party kubecost based provider's data, however our usage was not sufficient to justify for paying for this particular cloud service when its free usage expired.
Hence we needed to rewrite our cloud pricing data pipeline. This pipeline is in Apache Airflow but it could equally be in Dagster or any other data pipeline framework.
My talk deals with the wider points around cloud pricing, refactoring a data pipeline and pipeline framework options. But here I want to provide more detail on the data pipeline's Python code, its use of Embedded Postgres and Click, and the benefits for development and testing. Some things I didn't have room for in the talk.
Outline of our use of Data Pipelines
Notably local development mode for running up the pipeline framework locally and doing test runs.
Including some reloading on edit, it can still be a long process, running up a pipeline and then executing the full set of steps, known as a directed acyclic graph, DAG.
One way to improve the DEVX is if the DAG step's code is encapsulated as much as possible per step.
Removing use of shared state where that is viable and allowing individual steps to be separately tested, rapidly, with fixture data. With fast stand up and tear down, of temporary embedded storage.
To avoid shared state persistence across the whole pipeline we use extract transform load (ETL) within each step, rather than across the whole pipeline. This enables functional running and testing of individual steps outside the pipeline.
The Scraper Class
We need a standard scraper class to fetch the cloud prices from each CSP so use an abstract base class.
from abc import ABC
class BaseScraper(ABC):
"""Abstract base class for Scrapers"""
batch = 500
conn = None
unit_map = {"FAIL": ""}
root_url = ""
def map_units(self, entry, key):
"""To standardize naming of units between CSPs"""
return self.unit_map.get(entry.get(key, "FAIL"), entry[key])
def scrape_sku(self):
"""Scrapes prices from CSP bulk JSON API - uses CSP specific methods"""
Pass
def bulk_insert_rows(self, rows):
"""Bulk insert batches of rows - Note that Psycopg >= 3.1 uses pipeline mode"""
query = """INSERT INTO api_price.infra_price VALUES
(%(sku_id)s, %(cloud_provider)s, %(region)s, … %(sku_name)s, %(end_usage_amount)s)"""
with self.conn.cursor() as cur:
cur.executemany(query, rows)
This has 3 common methods:
- mapping units to common ones across all CSP
- Top level scrape sku methods some CSP differences within sub methods called from it
- Bulk insert rows - the main concrete method used by all scrapers
To bulk insert 500 rows per query we use Psycopg 3 pipeline mode - so it can send batch updates again and again without waiting for response.
The database update against local embedded Postgres is faster than the time to scrape the remote web site SKUs.
The largest part of the Extract is done at this point. Rather than loading all 5 million SKU as we did with the kubecost data dump, to query out the 120 thousand for our product. Scraping the sources directly we only need to ingest those 120k SKU. Which saves handling 97.6% of the data!
So the resultant speed is sufficient although not as performant as pg_dump loading which uses COPY.
Unfortunately Python Psycopg is significantly slower when using cursor.copy and it mitigated against using zipped up Postgres dumps. Hence all the data artefact creation and loading simply uses the pg_dump utility wrapped as a Python shell command.
There is no need to use Python here when there is the tried and tested C based pg_dump utility for it that ensures compatibility outside our pipeline. Later version pg_dump can always handle earlier Postgres dumps.
We don't need to retain a long history of artefacts, since it is public data and never needs to be reverted.
This allows us a low retention level, cleaning out most of the old dumps on creation of a new one. So any storage saving on compression is negligible.
Therefore we avoid pg_dump compression, since it can be significantly slower, especially if the data already contains compressed blobs. Plain SQL COPY also allows for data inspection if required - eg grep for a SKU, when debugging why a price may be missing.
class BaseScraper(ABC):
Postgres Embedded wrapped with Go
Python doesn’t have maintained wrapper for Embedded Postgres, sadly project https://github.com/Simulmedia/pyembedpg is abandoned 😢
Hence use the most up to date wrapper from Go. Running the Go binary via a Python shell command.It still lags behind by a version of Postgres, so its on Postgres 16 rather than latest 17.But for the purposes of embedded use that is irrelevant.
By using separate temporary Postgres per step we can save a dumped SQL artefact at the end of a step and need no data dependency between steps, meaning individual step retry in parallel, just works.
The performance of localhost dump to socket is also superior.By processing everything in the same (if embedded) version of our final target database as the Cloud Price, Go micro-service, we remove any SQL compatibility issues and ensure full Postgresql functionality is available.
The final data artefacts will be loaded to a Postgres cluster price schema micro-service running on CloudNativePG
The performance of localhost dump to socket is also superior.
The final data artefacts will be loaded to a Postgres cluster price schema micro-service running on CloudNativePG
Use a Click wrapper with Tests
The click package provides all the functionality for our pipeline..
> pscraper -h
Usage: pscraper [OPTIONS] COMMAND [ARGS]...
price-scraper: python web scraping of CSP prices for api-price
Options:
-h, --help Show this message and exit.
Commands:
awsscrape Scrape prices from AWS
azurescrape Scrape prices from Azure
delold Delete old blob storage files, default all over 12 weeks old are deleted
gcpscrape Scrape prices from GCP - set env GCP_BILLING_KEY
pgdump Dump postgres file and upload to cloud storage - set env STORAGE_KEY
> pscraper pgdump --port 5377 --file price.sql
pgembed Run up local embeddedPG on a random port for tests
> pscraper pgembed
pgload Load schema to local embedded postgres for testing
> pscraper pgload --port 5377 --file price.sql
This caters for developing the step code entirely outside the pipeline for development and debug.
We can run pgembed to create a local db, pgload to add the price schema. Then run individual scrapes from a pipenv pip install -e version of the the price scraper package.
For unit testing we can create a mock response object for the data scrapers that returns different fixture payloads based on the query and monkeypatch it in. This allows us to functionally test the whole scrape and data artefact creation ETL cycle as unit functional tests.
Any issues with source data changes can be replicated via a fixture for regression tests.
class MockResponse:
"""Fake to return fixture value of requests.get() for testing scrape parsing"""
name = "Mock User" payload = {} content = "" status_code = 200 url = "http://mock_url"
def __init__(self, payload={}, url="http://mock_url"): self.url = url self.payload = payload self.content = str(payload)
def json(self): return self.payload
def mock_aws_get(url, **kwargs): """Return the fixture JSON that matches the URL used"""
for key, fix in fixtures.items(): if key in url: return MockResponse(payload=fix, url=url) return MockResponse()
class TestAWSScrape(TestCase): """Tests for the 'pscraper awsscrape' command"""
def setUpClass(): """Simple monkeypatch in mock handlers for all tests in the class""" psycopg.connect = MockConn requests.get = mock_aws_get # confirm that requests is patched hence returns short fixture of JSON from the AWS URLs result = requests.get("{}/AmazonS3/current/index.json".format(ROOT)) assert len(result.json().keys()) > 5 and len(result.content) < 2000
A simple DAG with Soda Data validation
The click commands for each DAG are imported at the top, one for the scrape and one for postgres embedded, the DAG just becomes a wrapper to run them, adding Soda data validation of the scraped data ...
def scrape_azure():
"""Scrape Azure via API public json web pages"""
from price_scraper.commands import azurescrape, pgembed
folder, port = setup_pg_db(PORT)
error = azurescrape.run_azure_scrape(port, HOST)
if not error:
error = csp_dump(port, "azure")
if error:
pgembed.teardown_pg_embed(folder)
notify_slack("azure", error)
raise AirflowFailException(error)
data_test = SodaScanOperator(
dag=dag,
task_id="data_test",
data_sources=[
{
"data_source_name": "embedpg",
"soda_config_path": "price-scraper/soda/configuration_azure.yml",
}
],
soda_cl_path="price-scraper/soda/price_azure_checks.yml",
)
data_test.execute(dict())
pgembed.teardown_pg_embed(folder)
"""Scrape Azure via API public json web pages"""
from price_scraper.commands import azurescrape, pgembed
folder, port = setup_pg_db(PORT)
error = azurescrape.run_azure_scrape(port, HOST)
if not error:
error = csp_dump(port, "azure")
if error:
pgembed.teardown_pg_embed(folder)
notify_slack("azure", error)
raise AirflowFailException(error)
data_test = SodaScanOperator(
dag=dag,
task_id="data_test",
data_sources=[
{
"data_source_name": "embedpg",
"soda_config_path": "price-scraper/soda/configuration_azure.yml",
}
],
soda_cl_path="price-scraper/soda/price_azure_checks.yml",
)
data_test.execute(dict())
pgembed.teardown_pg_embed(folder)
We setup a new Embedded Postgres (takes a few seconds) and then scrape directly to it.
We then use the SodaScanOperator to check the data we have scraped, if there is no error we dump to blob storage otherwise notify Slack with the error and raise it ending the DAG
Our Soda tests check that the number of and prices are in the ranges that they should be for each service. We also check we have the amount of tiered rates that we expect. We expect over 10 starting usage rates and over 3000 specific tiered prices.
If the Soda tests pass, we dump to cloud storage and teardown temporary Postgres. A final step aggregates together each steps data. We save the money and maintenance of running a persistent database cluster in the cloud for our pipeline.