Ed Crewe Home

Monday, 7 April 2025

Talk about Cloud Prices at PyConLT 2025


Introduction to Cloud Pricing

I am looking forward to speaking at PyConLT 2025
My talk is called Cutting the Price of Scraping Cloud Costs

Its been a while (12 years!) since my last Python conference EuroPython Florence 2012, when I spoke as a Django web developer, although I did give a Golang talk at Kubecon USA last year.

I work at EDB, the Postgres company, on our Postgres AI product. The cloud version of which runs across the main cloud providers, AWS, Azure and GCP.

The team I am in handles the identity management and billing components of the product. So whilst I am mainly a Golang micro-service developer, I have dipped my toe into Data Science, having rewritten our Cloud prices ETL using Python & Airflow. The subject of my talk in Lithuania.

Cloud pricing can be surprisingly complex ... and the price lists are not small.

The full price lists for the 3 CSPs together are almost 5 million prices - known as SKUs (Stock Keeping Unit prices)

csp x service x type x tier x region
3    x  200      x 50     x 3     x 50        = 4.5 million

csp = AWS, Azure and GCP

service = vms, k8s, network, load balancer, storage etc.

type = e.g. storage - general purpose E2, N1 ... accelerated A1, A2  multiplied by various property sizes

tier  = T-shirt size tiers of usage, ie more use = cheaper rate - small, medium, large

region = us-east-1, us-west-2, af-south-1, etc.

We need to gather all the latest service SKU that our Postgres AI may use and total them up as a cost estimate for when customers are selecting the various options for creating or adding to their installation.
Applying the additional pricing for our product and any private offer discounts for it, as part of this process.

Therefore we needed to build a data pipeline to gather the SKUs and keep them current.

Previously we used a 3rd party kubecost based provider's data, however our usage was not sufficient to justify for paying for this particular cloud service when its free usage expired.

Hence we needed to rewrite our cloud pricing data pipeline. This pipeline is in Apache Airflow but it could equally be in Dagster or any other data pipeline framework.

My talk deals with the wider points around cloud pricing, refactoring a data pipeline and pipeline framework options. But here I want to provide more detail on the data pipeline's Python code, its use of Embedded Postgres and Click, and the benefits for development and testing.  Some things I didn't have room for in the talk.


Outline of our use of Data Pipelines

Airflow, Dagster, etc. provide many tools for pipeline development.
Notably local development mode for running up the pipeline framework locally and doing test runs.
Including some reloading on edit, it can still be a long process, running up a pipeline and then executing the full set of steps, known as a directed acyclic graph, DAG.

One way to improve the DEVX is if the DAG step's code is encapsulated as much as possible per step.
Removing use of shared state where that is viable and allowing individual steps to be separately tested, rapidly, with fixture data. With fast stand up and tear down, of temporary embedded storage.

To avoid shared state persistence across the whole pipeline we use extract transform load (ETL) within each step, rather than across the whole pipeline. This enables functional running and testing of individual steps outside the pipeline.


The Scraper Class

We need a standard scraper class to fetch the cloud prices from each CSP so use an abstract base class.


from abc import ABC

class BaseScraper(ABC):

   """Abstract base class for Scrapers"""

   batch = 500

   conn = None

   unit_map = {"FAIL": ""}

   root_url = ""


   def map_units(self, entry, key):

       """To standardize naming of units between CSPs"""

       return self.unit_map.get(entry.get(key, "FAIL"), entry[key])


   def scrape_sku(self):

       """Scrapes prices from CSP bulk JSON API - uses CSP specific methods"""

       Pass


   def bulk_insert_rows(self, rows):

       """Bulk insert batches of rows - Note that Psycopg >= 3.1 uses pipeline mode"""

       query = """INSERT INTO api_price.infra_price VALUES

       (%(sku_id)s, %(cloud_provider)s, %(region)s, %(sku_name)s, %(end_usage_amount)s)"""

       with self.conn.cursor() as cur:

           cur.executemany(query, rows)


This has 3 common methods:

  1. mapping units to common ones across all CSP
  2. Top level scrape sku methods some CSP differences within sub methods called from it
  3. Bulk insert rows - the main concrete method used by all scrapers

To bulk insert 500 rows per query we use Psycopg 3 pipeline mode - so it can send batch updates again and again without waiting for response.

The database update against local embedded Postgres is faster than the time to scrape the remote web site SKUs.


The largest part of the Extract is done at this point. Rather than loading all 5 million SKU as we did with the kubecost data dump, to query out the 120 thousand for our product. Scraping the sources directly we only need to ingest those 120k SKU. Which saves handling 97.6% of the data!


So the resultant speed is sufficient although not as performant as pg_dump loading which uses COPY.


Unfortunately Python Psycopg is significantly slower when using cursor.copy and it mitigated against using zipped up Postgres dumps. Hence all the data artefact creation and loading simply uses the pg_dump utility wrapped as a Python shell command. 

There is no need to use Python here when there is the tried and tested C based pg_dump utility for it that ensures compatibility outside our pipeline. Later version pg_dump can always handle earlier Postgres dumps.


We don't need to retain a long history of artefacts, since it is public data and never needs to be reverted.

This allows us a low retention level, cleaning out most of the old dumps on creation of a new one. So any storage saving on compression is negligible.

Therefore we avoid pg_dump compression, since it can be significantly slower, especially if the data already contains compressed blobs. Plain SQL COPY also allows for data inspection if required - eg grep for a SKU, when debugging why a price may be missing.


Postgres Embedded wrapped with Go

Unlike MySQL, Postgres doesn't do in memory databases. The equivalent for temporary or test run database lifetime, is the embedded version of Postgres. Run from an auto-created temp folder of files. 
Python doesn’t have maintained wrapper for Embedded Postgres, sadly project https://github.com/Simulmedia/pyembedpg is abandoned 😢

Hence use the most up to date wrapper from Go. Running the Go binary via a Python shell command.
It still lags behind by a version of Postgres, so its on Postgres 16 rather than latest 17.
But for the purposes of embedded use that is irrelevant.

By using separate temporary Postgres per step we can save a dumped SQL artefact at the end of a step and need no data dependency between steps, meaning individual step retry in parallel, just works.
The performance of localhost dump to socket is also superior.
By processing everything in the same (if embedded) version of our final target database as the Cloud Price, Go micro-service, we remove any SQL compatibility issues and ensure full Postgresql functionality is available.

The final data artefacts will be loaded to a Postgres cluster price schema micro-service running on CloudNativePG

Use a Click wrapper with Tests

The click package provides all the functionality for our pipeline..

> pscraper -h

Usage: pscraper [OPTIONS] COMMAND [ARGS]...

   price-scraper: python web scraping of CSP prices for api-price

Options:

  -h, --help  Show this message and exit.


Commands:

  awsscrape     Scrape prices from AWS

  azurescrape  Scrape prices from Azure

  delold            Delete old blob storage files, default all over 12 weeks old are deleted

  gcpscrape     Scrape prices from GCP - set env GCP_BILLING_KEY

  pgdump        Dump postgres file and upload to cloud storage - set env STORAGE_KEY
                      > pscraper pgdump --port 5377 --file price.sql 

  pgembed      Run up local embeddedPG on a random port for tests

> pscraper pgembed

  pgload           Load schema to local embedded postgres for testing

> pscraper pgload --port 5377 --file price.sql


This caters for developing the step code entirely outside the pipeline for development and debug.
We can run pgembed to create a local db, pgload to add the price schema. Then run individual scrapes from a pipenv pip install -e version of the the price scraper package.


For unit testing we can create a mock response object for the data scrapers that returns different fixture payloads based on the query and monkeypatch it in. This allows us to functionally test the whole scrape and data artefact creation ETL cycle as unit functional tests.

Any issues with source data changes can be replicated via a fixture for regression tests.

class MockResponse:

"""Fake to return fixture value of requests.get() for testing scrape parsing"""

name = "Mock User"
payload = {}
content = ""
status_code = 200
url = "http://mock_url"

def __init__(self, payload={}, url="http://mock_url"):
self.url = url
self.payload = payload
self.content = str(payload)

def json(self):
return self.payload


def mock_aws_get(url, **kwargs):
    """Return the fixture JSON that matches the URL used"""
for key, fix in fixtures.items():
if key in url:
return MockResponse(payload=fix, url=url)
return MockResponse()

class TestAWSScrape(TestCase):
"""Tests for the 'pscraper awsscrape' command"""

def setUpClass():
"""Simple monkeypatch in mock handlers for all tests in the class"""
psycopg.connect = MockConn
requests.get = mock_aws_get
# confirm that requests is patched hence returns short fixture of JSON from the AWS URLs
result = requests.get("{}/AmazonS3/current/index.json".format(ROOT))
assert len(result.json().keys()) > 5 and len(result.content) < 2000

A simple DAG with Soda Data validation

The click commands for each DAG are imported at the top, one for the scrape and one for postgres embedded, the DAG just becomes a wrapper to run them, adding Soda data validation of the scraped data ...

def scrape_azure():
   """Scrape Azure via API public json web pages"""
   from price_scraper.commands import azurescrape, pgembed
   folder, port = setup_pg_db(PORT)
   error = azurescrape.run_azure_scrape(port, HOST)
   if not error:
       error = csp_dump(port, "azure")
   if error:
       pgembed.teardown_pg_embed(folder) 
       notify_slack("azure", error)
       raise AirflowFailException(error)
  
   data_test = SodaScanOperator(
       dag=dag,
       task_id="data_test",
       data_sources=[
           {
               "data_source_name": "embedpg",
               "soda_config_path": "price-scraper/soda/configuration_azure.yml",
           }
       ],
       soda_cl_path="price-scraper/soda/price_azure_checks.yml",
   )
   data_test.execute(dict())
   pgembed.teardown_pg_embed(folder)
 


We setup a new Embedded Postgres (takes a few seconds) and then scrape directly to it.


We then use the SodaScanOperator to check the data we have scraped, if there is no error we dump to blob storage otherwise notify Slack with the error and raise it ending the DAG

Our Soda tests check that the number of and prices are in the ranges that they should be for each service. We also check we have the amount of tiered rates that we expect. We expect over 10 starting usage rates and over 3000 specific tiered prices.

If the Soda tests pass, we dump to cloud storage and teardown temporary Postgres. A final step aggregates together each steps data. We save the money and maintenance of running a persistent database cluster in the cloud for our pipeline.


Sunday, 22 September 2024

What is JSON transcoding?

A tool that enables use of gRPC as a single API for microservices and REST web frontend, by automated translation of your gRPC API into JSON RESTful services.


What is gRPC?

gRPC is the name of a language agnostic data transfer framework designed for cloud microservices implemented via HTTP2, created by Google, around the same time they released Kubernetes.
It is to some extent equivalent to the REST standard that was developed over HTTP1.1 and used to transfer data for web browsers. But REST uses plain text JSON, where as gRPC encodes messages as binary protobuf format data.

There are similarities between gRPC development and REST development, hence similar tools.
For scripted API interactions REST has Postman which also has some gRPC support, but there is also Kreya, grpcUI etc.

Why use gRPC rather than REST?

gRPC is 10 x faster than REST and best suited to cross microservice remote procedure calls.

gRPC uses protoc to generate any or all of Go, Python, C#, C++, Java, Node.js, PHP, Ruby, Dart, Kotlin & Rust code out of the box - so allowing your microservice engineers to use native Go, your SREs Python and other integrators their language of choice. Each language can import the autogenerated API libraries for it, which are generated from the *.proto source files that developers create to define the API.

You have a single versioned API defined in one place for all services. This may be a dedicated myservices-proto package with all your source/*.proto files in and a script to run protoc and generate the libraries for each language your company uses. Along with the master API definition file eg. descriptor/proto.pb or proto.pbtext for the bigger human readable version.

The gRPC protocol is strongly typed and allows full validation of data in and out. It is a binary format so more compact. JSON via REST does not allow such control of the data typing and validation. It was designed as a simple serialisation format for dynamically typed HTML page scripting language, Javascript objects. Not as a performant backend RPC protocol for cloud microservices.

Because of the much looser standards around REST and JSON many people adopt the Swagger  framework to help standardize and document their REST API. Whilst gRPC has formal standards in its core protocols.

Why use JSON transcoding?


Why still use REST too?

The first question that occurs is why run a REST API at all? The reason is that whilst it uses large, slow and minimally typed messaging. It is the default standard approach for front end web development. Since web front ends are implemented in Javascript it is natural to build them against a RESTful backend that provides data in the native JSON format.

Added to this, even the latest web browsers have incomplete support for the HTTP2 protocol required by gRPC. This in turn leads to poor support in the Javascript eco-system. Also for devx accessibility, gRPC messages are not immediately human readable.

Perhaps most importantly gRPC was never designed to replace REST. It was designed for fast backend cloud deployed internal service composition.
REST is a web protocol designed for loose coupling services from across the web. Where the loose standards, typing, simplicity and transparency of REST / JSON complement HTML5. Given attempts to impose stricter typing, such as XHMTL were a failure, using gRPC as a front end replacement for REST is asking for trouble. Instead standardizing REST via Swagger, OpenAPI and the like are a more practical approach.

The front-end web world loves REST. But gRPC is far superior for more tightly coupled backend microservices. Given that, JSON transcoding is likely to remain a very useful means of saving on API proliferation and complexity by providing a single API for both, via your edge servers (ie the servers between your cloud deployed services and the internet).


How does JSON transcoding work?

JSON transcoding suits a gRPC centric single master API design. The ideal approach when designing new cloud services built from micro-services. 

It is implemented by using the transcoder plugin which can run in existing proxy servers such as Envoy Proxy The plugin can use gRPC proto source files to autogenerate a JSON REST API without any code generation required.

Alternatively there is grpc-gateway which generates the implementation from the proto files and requires a compilation step. This different implementation under the hood is not strictly transcoding. But in effect it does the same job.

Or for Microsoft language world, there is the JSON transcoder for ASP.net

In all cases you create the REST API based on the google.api.http annotations standard in your *.proto files (add the green text below) to a simple example service ...

syntax = "proto3";
package your.service.v1;
option go_package = "github.com/yourorg/yourprotos/gen/go/your/service/v1";

import "google/api/annotations.proto";

 message StringMessage {
   string value = 1;
 }

service YourService {
   rpc Echo(StringMessage) returns (StringMessage) {
      option (google.api.http) = {
          post: "/v1/example/echo"
          body: "*"
      };
   }
}

The google.api.http set of annotations are used to define how the gRPC method can be called via REST. 


Why use it?

Use a transcoder and you only need to maintain your gRPC and your public REST API in one place - the gRPC proto files - with simple one liner annotations

You no longer need to develop any REST server code. Running up web servers that provide a REST API and have to be manually coded to keep it in synch with the backend gRPC API can be dispensed with. Or if you are running direct REST interfaces from your Golang microservices these can be dropped as less type safe and more error prone. Replacing them with gRPC microservices and replacing data validation code layers in different backend code languages, with proto level data validation in one place. Validation can be your own custom validator code or you can use a full plugin such as buf.build or protoc-gen-validator

Now as you build your gRPC API you also build a JSON RESTful one too. Adding custom data validators at the gRPC level, defining your full API in one place.

The gRPC API annotations give you a performant REST API that is auto generated and run via Envoy Proxy’s fast C++ based edge server - for use by the JSON front end. Automatically transcoding messages back and forth from REST request / responses to gRPC ones.


What about Transcoding other HTTP content types that are not JSON?


It may be called a JSON transcoder but the transcoder can also transcode other content types.

To transcode other http content types, you must use the google.api.HttpBody type. Put the content in the body and set (and call the UI) with the appropriate content-type header. For example for a gRPC CSV file download, eg. getting log files ...


syntax = "proto3";
package your.service.v1;
option go_package = "github.com/yourorg/yourprotos/gen/go/your/service/v1";

import "google/api/annotations.proto";
import "google/api/httpbody.proto";

 message StringMessage {
   string value = 1;
 }

service YourService {
  rpc GetCSVFile(StringMessage) returns (google.api.HttpBody) {
    option (google.api.http) = {
        get : "/v1/example/echo.csv"
    }; 
  }
}

A Go implementation of the method to return the CSV file might be ...

import (
"embed"
"google.golang.org/genproto/googleapis/api/httpbody"

api_v1 "github.com/my-proto-pkg/generated/go/public/v1"
)
//go:embed *.csv
var EmbedFS embed.FS

// GetCSVFile to return a CSV file via gRPC as HttpBody
func (p *Provider) GetCsvFile(ctx context.Context, req api_v1.StringMessage) (*httpbody.HttpBody, error) {
csvData, err := EmbedFS.ReadFile(req.Value)
if err != nil {
return nil, err
}
return &http.HttpBody{
ContentType: "text/csv",
Data: csvData,
}, nil
}


A call to /v1/example/echo.csv?value=smalldata.csv with content-type=text/csv (or application/json) should return that file.

Other content types such as PDF can be similarly handled.


Data streaming large content types


When returning content other than compact gRPC message formats another issue arises.
What if bigdata.csv is 2 Gb in size?  gRPC upload limits are 4 Mb and although download is unlimited it is best to stream anything that may be over that size.

For large messages response streaming content needs to be used.

It is very simple to switch the protocol for gRPC and the transcoded Http REST request and / or responses. If either is prefixed with the word stream then that streaming handling is implemented both in gRPC and for the transcoded REST API. So to stream 2 Gb files from REST change the proto definition as shown in bold red ...

rpc StreamCSVFile(StringMessage) returns (stream google.api.HttpBody) {

Although this is not the only thing to be done. The main work is that the method implementation needs to stream the data too.

// StreamCsvFile to stream large CSV files via HttpBody
func (p *Provider) StreamCsvFile(req *emptypb.Empty, responseStream api_v1.MyProto_StreamCsvFileServer) error {
f, err := os.Open("/tmp/bigdata.csv")
if err != nil {
return nil
}
defer f.Close()

r := bufio.NewReader(f)
buf := make([]byte, 4*1024*1024) // Use 4 MB buffer

for {
n, err := r.Read(buf)
if n > 0 {
resp := &gapi.HttpBody{
ContentType: "text/csv",
Data: buf[:n],
}
if err := responseStream.Send(resp); err != nil {
return nil
}
}
if err == io.EOF {
break
}
if err != nil {
return nil
}
}
return nil
}

When the file is fetched it returns it to the browser as streamed Http - so in chunks.
So if you do this via Envoy and have it on debug mode you can see it being served by a series of responses in chunks via streaming http, in response to a REST get.

1. curl --header 'Content-Type: application/json' http://localhost:8080/v1/example/echo.csv?value=bigdata.csv
2. transcoder translates that request to gRPC http://grpc_host:80/myproto.api.v1.StreamCsvFile
3. The microservice returns a stream of gRPC responses with the file chunked up into them to Envoy
4. Envoy starts serving those chunks as http responses to the curl web client
5. When all the responses are done the task is complete and curl has downloaded the full file and stuck it back together.

When tested out locally on my Mac it downloaded files at over 100Mb/sec so transcoding does not appear to introduce any performance hit.

How to try out Envoy with the transcoder filter


Envoy can be downloaded and installed locally to try out JSON transcoding. You just need to be able to run up the gRPC service for it to talk to and provide the proto.pb API definition to it via the Envoy config.yaml

Envoy is easily installed on Linux or Mac

Once it is installed run up your gRPC service or port-foward it from k8s or kind.

Copy its proto.pb to a local protos/helloworld.pb directory for envoy to access it. Then run up envoy...

envoy --service-node ingress --service-cluster ingress -c envoy-config.yaml --log-level debug

A sample config for running a service is detailed in the Envoy transcoder help page 
Note that the REST path should not be added to the config with the current version 3 transcoder.
The transcoder reads the paths for REST from the proto.pb
It just needs the gRPC dot notation based URL /helloword.Greeter that indicates the gRPC service
15    - filters:
16      - name: envoy.filters.network.http_connection_manager
17        typed_config:
18          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
19          stat_prefix: grpc_json
20          codec_type: AUTO
21          route_config:
22            name: local_route
23            virtual_hosts:
24            - name: local_service
25              domains: ["*"]
26              routes:
27              - match:
28                  prefix: /helloworld.Greeter
29                route:
30                  cluster: grpc_myproto 
When port forwarding from a k8s running service something along the following lines could be used to point Envoy at it. 

clusters: - name: grpc_myproto type: STATIC connect_timeout: 5s http2_protocol_options: {} load_assignment: cluster_name: grpc_myproto endpoints: - lb_endpoints: - endpoint: address: socket_address: address: 127.0.0.1 port_value: 9090

Note that the sample config from the envoy transcoder help page is missing some useful config that you may want in production, eg.  convert_grpc_status: true, so that if gRPC errors occur, they are transcoded and returned in the body of the http response. By default (== false), the body is blank and the gRPC error is only in the header. There is a full list of config options available.

Testing


If you want to write tests against REST in Go (rather than just testing the gRPC) you will need the Go JSON annotated structs to test with, that marshal JSON back and forth. 

Since the REST API is transcoded on the fly by Envoy then these don't exist in any of your source code. To generate them you need to use command line translation tools.

gRPC proto.pb > buf.build > OpenAPIv2 at this point you have a JSON open API spec which can be used to create your REST tests in Javascript or any language with OpenAPI v2 support.
To do so in Go we can continue the pipeline with OpenAPIv2 > OAPI-codegen > generated_v1.go 
A test using httpexpect (to clean up the raw syntax of http) would be something like ...

resp := generated_v1.GetCsvResponse{}
e := httpexpect.Default(t, restSvcURL)
e.GET("/v1/example/echo.csv").Expect().Status(http.StatusOK).JSON().Decode(&resp)
assert.Equal(t, resp.Data, sampleCSV)
(this assumes our example GetCsvResponse embeds the
google.api.HttpBody as Data and includes other fields)