Previous post: part I

4. Experiments

In the previous section we went through the training process. In reality, the model needs a series of experiments to find the best set of hyperparameters. It is very important to properly keep track of the experiment results in a reproducible manner, and setup unified practice among team members. SageMaker Experiments solves this problem.

SageMaker Experiments has its own Python package you can import with:

from smexperiments.experiment import Experiment

An experiment should be the outer most group for a collections of trials. Each trial can map to a specific algorithm you use, such as a Random Forest, XGBoost, Logistic Regression, etc.

from smexperiments.trial import Trial

A trial is a series of steps, each step is called a trial_component.

from smexperiments.trial_component import TrialComponent

Each trial component can have a combination of inputs such as datasets, algorithm and parameters. You can produce desired outputs such as models, datasets, metrics and checkpoints.

Examples of trial components are

  • data preprocessing jobs
  • training jobs
  • batch transform jobs

A trial component can be, e.g. the full preprocessing logic, or each step within it such as imputing missing values, one-hot encoding, etc. Each trial component (a trial step) can store information such as mean and standard deviation or the metric you care about during the experiments so you can compare them.

Next let’s look at examples for training MNIST classifiers using TensorFlow and PyTorch.

4.1 An Experiment Example using TensorFlow 2

We use a custom training script provided by TensorFlow for this use case. Check out the script here.

Note that in _parse_args(), model_dir is passed from SageMaker and the others are from SageMaker environment variables.

def _parse_args():
    parser = argparse.ArgumentParser()

    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--model_dir', type=str)
    parser.add_argument('--sm-model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAINING'))
    parser.add_argument('--hosts', type=list, default=json.loads(os.environ.get('SM_HOSTS')))
    parser.add_argument('--current-host', type=str, default=os.environ.get('SM_CURRENT_HOST'))

    return parser.parse_known_args()

Here is the main block which is very simple:

if __name__ == "__main__":
    args, unknown = _parse_args()

    train_data, train_labels = _load_training_data(args.train)
    eval_data, eval_labels = _load_testing_data(args.train)

    mnist_classifier = model(train_data, train_labels, eval_data, eval_labels)

    if args.current_host == args.hosts[0]:
        # save model to an S3 directory with version number '00000001'
        mnist_classifier.save(os.path.join(args.sm_model_dir, '000000001'), 'my_model.h5')

Note: the sample data from S3 is in .npy format which is much more performant than csv format.

# Create the Experiment
experiment_name = f"tf-mnist-{datetime.datetime.now().strftime('%Y%m%d%H%M')}"
description = "Classification of mnist hand-written digits using tensorflow 2"

tf_experiment = Experiment.create(experiment_name=experiment_name,
                                  description=description,
                                  sagemaker_boto_client=sagemaker_client)

# Create the Trial
trial_name = f"tf-trial-{datetime.datetime.now().strftime('%Y%m%d%H%M')}"

tf_trial = Trial.create(trial_name=trial_name,
                        experiment_name=tf_experiment.experiment_name,
                        sagemaker_boto_client=sagemaker_client)


from sagemaker.tensorflow import TensorFlow

# Create the training job. This takes 3-5 min
tf_estimator = TensorFlow(
    entry_point='../scripts/tf/tensorflow_mnist.py',
    role=role,
    train_instance_count=1,
    train_instance_type='ml.p2.xlarge',
    code_location=f"s3://{BUCKET}/{PREFIX}",
    output_path=f"s3://{BUCKET}/{PREFIX}",
    base_job_name='tf-mnist',
    py_version='py3',
    framework_version='2.1.0',
    enable_sagemaker_metrics=True)

experiment_config = {
        "TrialName": tf_trial.trial_name,
        "TrialComponentDisplayName": "Training",
    }

# Now associate the estimator with the Experiment and Trial
tf_estimator.fit(inputs={'training': training_data_uri},
                 experiment_config=experiment_config)

Notice that the TensorFlow constructor takes in some additional parameters compared to the previous training jobs: py_version, framework_version, and enable_sagemaker_metrics.

The SageMaker TensorFlow object has legacy mode and script mode, py_version='py3' indicates that we are using script mode. Legacy mode only supports Python 2. framework_version is the actual TensorFlow version we want to use.

enable_sagemaker_metrics=True enables logging of metrics such as accuracy over time, so that we can see the visualization outside the notebook in SageMaker Studio.

After executing the code above, you can view your experiment metadata by clicking on the beaker on the sidebar! It shows a two-column table: experiment name and last modified time. First, click on the refresh button in this panel, then you’ll see the experiment you just ran. You can also search by experiment name in its search bar.

Double click on the experiment, it shows the trials under it. Double click on the trial, it then shows the trial components. It looks like this:

SageMaker trial component

The Metrics tab shows the metrics it automatically scraped from the log of our training job. It’s configured by the TensorFlow estimator object and the Docker image used. You can explore other tabs for more metadata.

4.2 An Experiment Example using PyTorch

Now let’s switch to PyTorch and demonstrate how to run multiple trials with different hyperparameters.

from matplotlib import pyplot as plt
import pandas as pd
from torchvision import datasets, transforms

%config InlineBackend.figure_format = 'retina'

transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize((0.1307,), (0.3081,))])

# Download, load, and transform the data.
train_set = datasets.MNIST(LOCAL_DATA_DIRECTORY, train=True, transform=transform, download=True)
test_set = datasets.MNIST(LOCAL_DATA_DIRECTORY, train=False, transform=transform, download=True)

inputs = sagemaker_session.upload_data(
    path=LOCAL_DATA_DIRECTORY,
    bucket=BUCKET,
    key_prefix=PREFIX)

from smexperiments.tracker import Tracker

with Tracker.create(display_name="Preprocessing", sagemaker_boto_client=sagemaker_client) as tracker:
    tracker.log_parameters({
        "normalization_mean": 0.1307,
        "normalization_std": 0.3081,
    })

    tracker.log_input(name="mnist-dataset", media_type="s3/uri", value=inputs)


experiment_name = f"torch-mnist-{datetime.datetime.now().strftime('%Y%m%d%H%M')}"
description = "Classification of mnist hand-written digits with pytorch."

mnist_experiment = Experiment.create(experiment_name=experiment_name,
                                     description=description,
                                     sagemaker_boto_client=sagemaker_client)


# %%
from sagemaker.pytorch import PyTorch

hidden_channel_trial_name_map = {} # Keep references to each Trial object

# If you want to run the following training jobs asynchronously, you may need to increase
# your resource limit. Otherwise, you can run them sequentially.
for i, num_hidden_channel in enumerate([2, 5, 10]):

    # create Trial object
    trial_name = f"torch-{num_hidden_channel}-hidden-channels-{datetime.datetime.now().strftime('%Y%m%d%H%M')}"
    cnn_trial = Trial.create(
        trial_name=trial_name,
        experiment_name=mnist_experiment.experiment_name,
        sagemaker_boto_client=sagemaker_client,
    )
    # Have a dict for variable references to the trial names
    hidden_channel_trial_name_map[num_hidden_channel] = trial_name

    # Associate the proprocessing trial component with the current trial
    cnn_trial.add_trial_component(tracker.trial_component)

    # all input configurations, parameters, and metrics specified in estimator
    # definition are automatically tracked
    estimator = PyTorch(
        entry_point='../scripts/torch/pytorch_mnist.py',
        role=role,
        sagemaker_session=sagemaker_session,
        framework_version='1.1.0',
        train_instance_count=1,
        train_instance_type='ml.c4.xlarge',
        code_location=f"s3://{BUCKET}/{PREFIX}",
        output_path=f"s3://{BUCKET}/{PREFIX}",
        base_job_name='torch-mnist',
        hyperparameters={
            'epochs': 2,
            'backend': 'gloo',
            'hidden_channels': num_hidden_channel,
            'dropout': 0.2,
            'optimizer': 'sgd'
        },
        metric_definitions=[
            {'Name':'train:loss', 'Regex':'Train Loss: (.*?);'},
            {'Name':'test:loss', 'Regex':'Test Average loss: (.*?),'},
            {'Name':'test:accuracy', 'Regex':'Test Accuracy: (.*?)%;'}
        ],
        enable_sagemaker_metrics=True,
    )

    # Now associate the estimator with the Experiment and Trial
    estimator.fit(
        inputs={'training': inputs},
        experiment_config={
            "TrialName": cnn_trial.trial_name,
            "TrialComponentDisplayName": "Training",
        },
        wait=True,
    )

    # give it a while before dispatching the next training job
    time.sleep(2)

The core of the code above is to loop through num_hidden_channel and create a trial for each value of it. We can specify other hyperparameters and a custom regex for metric definition we care about in the PyTorch object.

This can take 15 minutes to run. After completion, we can click on the beaker icon in the left bar. We can either inspect each individual trial and look at the charts for training over time, or we can look at all trial components in one chart by selecting them and right clicking to open then in a list, select add chart with Summary Statistics and Scatter Plot. It’s quite powerful to create such visualization in the browser without having to write any code.

Experiment is extremely important. For a team, it is good practice to use SageMaker Experiments for storing and grouping results for easy access.

4.3 Tracking lineage: finding the best model

We’ve run our experiments, now we can find the best model simply by running some code in the notebook.

from sagemaker.analytics import ExperimentAnalytics

search_expression = {
    "Filters":[
        {
            "Name": "DisplayName",
            "Operator": "Equals",
            "Value": "Training",
        }
    ],
}

trial_component_analytics = ExperimentAnalytics(
    sagemaker_session=sagemaker_session,
    experiment_name=mnist_experiment.experiment_name,
    search_expression=search_expression,
    sort_by="metrics.test:accuracy.max",
    sort_order="Descending",
    metric_names=['test:accuracy'],
    parameter_names=['hidden_channels', 'epochs', 'dropout', 'optimizer']
)

trial_component_analytics.dataframe()

We will see the dataframe as the one below

trial comps

Recall that we have a dictionary hidden_channel_trial_name_map for variable references to the trials, we can use it to look at the best trial,

lineage_table = ExperimentAnalytics(
    sagemaker_session=sagemaker_session,
    search_expression={
        "Filters":[{
            "Name": "Parents.TrialName",
            "Operator": "Equals",
            "Value": hidden_channel_trial_name_map[2]
        }]
    },
    sort_by="CreationTime",
    sort_order="Ascending",
)

lineage_table.dataframe()

Now we see a lot more information about it:

lineage

SageMaker has more advanced hyperparameter tuning capabilities. You can dig in to learn more.

5. Deployment

Deployment is an area that’s usually outside the expertise of most data scientists. SageMaker makes it easy to deploy both batch and online inference as APIs.

In order to deploy, we first need to instantiate a sagemaker.model.Model object. model_data is the serialized model.

# See the `sagemaker.model.Model`
# [API reference](https://sagemaker.readthedocs.io/en/stable/model.html) for more details.
from sagemaker import model

model_data = 's3://.../model.tar.gz'
image = '...dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-2-cpu-py3'

churn_model = model.Model(model_data=model_data,
                          image=image,
                          role=role)

To find the right model_data and image pair for the model you want to deploy, find that training job in SageMaker console -> Training Jobs, once you find the right job by name, click into it, and you can find the image uri under the Algorithm panel. For the s3 location of that serialized model, scroll to the bottom of the page and find the Output panel, it’s under S3 model artifact.

5.1 Batch inference

To make a batch inference endpoint, we need sagemaker.transformer.Transformer object.

See the sagemaker.transformer.Transformer API reference for more details.

churn_transformer = churn_model.transformer(instance_count=1,
                                            instance_type='ml.m4.xlarge',
                                            strategy='MultiRecord',
                                            assemble_with='Line',
                                            output_path=f"s3://{BUCKET}/{PREFIX}/transform")

strategy can be SingleRecord or MultiRecord. assemble_with can be None or Line.

To run the transform batch job,

# Start a transform job and wait for it to finish
# Takes around 3-5 minutes
churn_transformer.transform(data=inference_data,
                            content_type='text/csv',
                            split_type='Line')

print('Waiting for transform job: ' + churn_transformer.latest_transform_job.job_name)
churn_transformer.wait()

content_type is set to 'text/csv' because we have csv data, split_type set to 'Line' following the transformer object so it outputs one line at a time.

This job is asynchronous by default. The .wait() method makes it blocking the notebook until it returns.

In the log outputs in the notebook, we can see the job actually starts a gunicorn web server.

To get the location of the output data, run

churn_transformer.output_path

For each input file we get a corresponding .out file that contains the predicted labels.

# List files in S3 bucket
s3_client = boto_session.client('s3')
s3_client.list_objects(Bucket = BUCKET, Prefix = f'{PREFIX}/transform')
# Download the output data from S3 to local filesystem
s3_client.download_file(
    Bucket=BUCKET,
    Key=f"{PREFIX}/transform/test-batch.csv.out",
    Filename=f'{LOCAL_DATA_DIRECTORY}/test-batch.csv.out')

# Show a few lines of this file
!head -5 ../data/churn/test-batch.csv.out
"""
0.010853796266019344
0.005068291909992695
0.008791499771177769
0.16663919389247894
0.004287515766918659
"""

The output file has one prediction per line corresponding to the input file.

Note that when we created the transformer object, we can find some new things in the SageMaker console. One thing is the model object. Find it in the console under Inference -> Models. Inside, we can see the model name, the ARN role, creation time, container info, Docker image used, training job, and the location of the serialized model on S3.

In that GUI, we can add tag or delete the model under Actions, or Create batch tranform job, or Create endpoint, directly from the console.

We can also find the previous batch transform jobs under Inference. You can inspect the metadata and audit the entire process of these jobs if you click into them.

5.2 Online inference

SageMaker abstracts away all the API code, network and routing logic and makes it extremely easy to deploy the model as an endpoint.

Here we demonstrate using a custom trained sklearn model. Instead of sagemaker.model.Model, we use its subclass sagemaker.sklearn.SKLearnModel. See the sagemaker.sklearn.SKLearnModel API reference for more details.

from sagemaker import sklearn

model_data = 's3://.../output/model.tar.gz'

sklearn_model = sklearn.SKLearnModel(model_data=model_data,
                                     role=role,
                                     entry_point='../scripts/sklearn/sklearn_rf.py')

Again, model_data is the serialized model in S3 which can be found in the Training job section from the SageMaker console. We also need the same custom script that we used to train the model as entry_point. It contains the code to deserialize the model.

See the sagemaker.predictor.RealTimePredictor API reference for more details.

sklearn_predictor = sklearn_model.deploy(initial_instance_count=1,
                                         instance_type="ml.m4.xlarge")

NOTE: This takes about 6-8 minutes to return. And an endpoint will be ready.

Now we can click on the Endpoint List in the sidebar and refresh to see the new endpoint. Right click on it and click Describe Endpoint, a new tab is opened. There are three sub tabs:

  • Monitoring results
  • Monitoring job history
  • AWS settings

Click on AWS settings, we get the following view.

SageMaker endpoint

Under Production variants, there is a table for different models deployed. We only have one model now – our sklearn random forest, so it gets all the traffic. If we have multiple models, this feature here is good for running A/B testing, or different deployment strategies such as canary deployment, or blue/green deployment.

Now we can test the endpoint within the notebook as if we are just calling an sklearn object locally. The code below actually makes an HTTP request and gets the response.

df = pd.read_csv(f'{LOCAL_DATA_DIRECTORY}/test-dataset.csv', header=None)

# Remove first column which contains labels
X = df.drop(labels=0, axis=1)
X.head()

preds = sklearn_predictor.predict(X)
print(preds)

This endpoint can handle multiple records at the same time, no need to send a single row each time!

Here are two interesting articles for further information on using the endpoint.

5.3 Autoscaling according to demand

For the batch inference case, SageMaker spins up an instance, perform the computation and shuts it down. For online inference, it spins up an instance an keeps it alive. SageMaker supports autoscaling the number of instances according to your workload.

To configure autoscaling for a model using the console:

  1. Open the Amazon SageMaker console at https://console.aws.amazon.com/sagemaker/.
  2. In the navigation pane, choose Endpoints.
  3. Choose the endpoint that you want to configure.
  4. Under the Endpoint runtime settings heading, select the radio button corresponding to the model variant that you want to configure and click Configure autoscaling. The Configure variant automatic scaling page appears.
  5. The Variant automatic scaling section lets us configure the min/max number of instances.
    • For Minimum instance count, type the minimum number of instances that you want the scaling policy to maintain. At least 1 instance is required.
    • For Maximum instance count, type the maximum number of instances that you want the scaling policy to maintain.
  6. The Built-in scaling policy section lets us configure the conditions under which to scale the instances.
    • For the Target value, type the average number of invocations per instance per minute for the model. To determine this value, follow the guidelines in Load testing. Application Auto Scaling adds or removes instances to keep the metric close to the value that you specify.
    • For Scale-in cool down and Scale-out cool down type the number seconds for each cool down period. Assuming that the order in the list is based on either most important to less important of first applied to last applied.
    • Select Disable scale in to prevent the scaling policy from deleting variant instances if you want to ensure that your variant scales out to address increased traffic, but are not concerned with removing instances to reduce costs when traffic decreases. Scale-out activities are always enabled so that the scaling policy can create endpoint instances as needed.
  7. Choose Save.

More details, including how to define a custom scaling policy, can be found in the Developer Guide.

There is no silver bullet for defining the auto scaling policy for your use case. Just come up with an estimate and iteratively find what works for you.

5.4 Delete the endpoint

Note that you will be charged by the resources you created in this section: models and the endpoint. They are under Inference in the SageMaker console. You can either delete them in the GUI, or run the code below in the notebook:

churn_model.delete_model()

sklearn_predictor.delete_endpoint()
sklearn_model.delete_model()

6. Model Monitoring

Note: monitoring is only available after the live endpoint is up for at least 1 hour.

A machine learning model may perform well at the beginning but start to degrade over time. One big cause is data drift: the input data distribution changed over time. In SageMaker, we can setup monitoring for endpoints to log and store live data. With the capability, we can compare live data with historical data to identify drift or data quality issues.

6.1 Enable live data capture for a model endpoint

First of all, get the prerequisites setup just like the previous section.

%cd /root/sagemaker-course/notebooks/

import time

import boto3
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role, model

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_columns', 20)
pd.set_option('display.max_rows', 70)

# S3 bucket information
BUCKET = '<bucketname>'
PREFIX = 'churn'
LOCAL_DATA_DIRECTORY = f'../data/{PREFIX}'
print(f"Artifacts will be written to s3://{BUCKET}/{PREFIX}")

# Session variables we'll use throughout the notebook
sagemaker_session = sagemaker.Session()
boto_session = sagemaker_session.boto_session
sagemaker_client = boto_session.client('sagemaker')
role = get_execution_role()
print(f'Role ARN: {role}')

Before we can setup the monitoring, we need a live endpoint. The previous section showed that, but let’s do it again for the XGBoost model.

model_data = 's3://<bucket>/churn/.../output/model.tar.gz'
image = '...'

churn_model = model.Model(model_data=model_data,
                          image=image,
                          role=role,
                          sagemaker_session=sagemaker_session)

Again, model_data and image can be found in SageMaker console -> Training jobs -> the xgboost model we trained.

To setup the monitoring, we need to configure a sagemaker.model_monitor.DataCaptureConfig object. Set enable_capture to true, and the percentage of the data to be capured to 100%, then set the s3 path for the captured data where it should be stored.

from sagemaker import model_monitor, predictor

captured_data_s3_uri = f's3://{BUCKET}/{PREFIX}/model-monitor/data-capture'
print(f'Captured data stored in bucket: {captured_data_s3_uri}')

data_capture_config = model_monitor.DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri=captured_data_s3_uri,
    sagemaker_session=sagemaker_session)

# Deploy the endpoint if you don't have one yet. Takes a while
# Note that data_capture_config is passed in
churn_model.deploy(initial_instance_count=1,
                   instance_type='ml.t2.medium',
                   data_capture_config=data_capture_config)

Notice that data_capture_config is passed into the model deployment for monitoring to work.

To call this endpoint in code, we need predictor.RealTimePredictor. It encapsulates the HTTP request.

churn_predictor = predictor.RealTimePredictor(
    endpoint=churn_model.endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer = predictor.csv_serializer,
    content_type='text/csv')

The part serializer = predictor.csv_serializer means that the predictor serializes the csv data into bytes BEFORE sending it over the wire to the endpoint via an HTTP request.

Here is the example to fake some live data using a csv file.

print("Sending test traffic to the endpoint {}. \nPlease wait for a minute..."
        .format(churn_model.endpoint_name))

with open(f'{LOCAL_DATA_DIRECTORY}/test_sample.csv', 'r') as f:
    for row in f:
        payload = row.rstrip('\n')
        response = churn_predictor.predict(data=payload)
        time.sleep(0.5)

We can check if the data is capture in S3 by checking the output location. The format of the Amazon S3 output path is:

s3://{destination-bucket-prefix}/{endpoint-name}/{variant-name}/yyyy/mm/dd/hh/filename.jsonl

We can inpsect this file by downloading it and print out its content:

import json
from sagemaker.s3 import S3Uploader, S3Downloader

current_endpoint_capture_prefix = f'{PREFIX}/model-monitor/data-capture/{churn_model.endpoint_name}'
capture_files = S3Downloader.list(f"s3://{BUCKET}/{current_endpoint_capture_prefix}")

print("Found Data Capture Files:")
print(capture_files)

capture_file = S3Downloader.read_file(capture_files[-1])

print("=====Single Data Capture====")
print(json.dumps(json.loads(capture_file.split('\n')[0]), indent=2)[:2000])

"""
=====Single Data Capture====
{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "186,0.1,137.8,97,187.7,118,146.4,85,8.7,6,1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,0.10,0.11,0.12,0.13,0.14,0.15,0.16,0.17,1.1,0.18,0.19,0.20,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.30,0.31,0.32,0.33,0.34,0.35,0.36,0.37,0.38,0.39,0.40,0.41,0.42,0.43,0.44,0.45,0.46,0.47,0.48,0.49,0.50,0.51,0.52,0.53,1.2,1.3,0.54,1.4,0.55",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.014719205908477306",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "f38dfc73-c631-4396-bffc-3313a7157be7",
    "inferenceTime": "2020-05-20T14:08:54Z"
  },
  "eventVersion": "0"
}
"""

In the printed file content we see that our input data is under captureData:endpointInput:data, and the endpoint gives back the prediction in endpointOutput:data.

By simply passing in a model_monitor.DataCaptureConfig, we have enabled model montoring! This way we can have all the live data and their predictions nicely logged in S3!

6.2 Generating constraints and suggestions from a baseline dataset

We use our training data as baseline for comparison. First, upload the training data to S3.

baseline_prefix = f'{PREFIX}/baselining'
baseline_data_prefix = f'{baseline_prefix}/data'
baseline_results_prefix = f'{baseline_prefix}/results'

# Store the training data
baseline_data_uri = f's3://{BUCKET}/{baseline_data_prefix}'
# Store the results of our baseline job
baseline_results_uri = f's3://{BUCKET}/{baseline_results_prefix}'

print(f'Baseline data URI: {baseline_data_uri}')
print(f'Baseline results URI: {baseline_results_uri}')

baseline_data_path = S3Uploader.upload(
    f"{LOCAL_DATA_DIRECTORY}/training-dataset-with-header.csv",
    baseline_data_uri)

model_monitor.DefaultModelMonitor is a plug-and-play tool that only needs the dataset and some pre and post processing scripts. There’s a more advanced and customized option that is sagemaker.model_monitor.ModelMonitor if you need more control.

my_default_monitor = model_monitor.DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_path,
    dataset_format=model_monitor.DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True)

Set wait=True to make it a blocking call. The baseline dataset must have the exact same schema as the live data, or the comparison will fail.

The log printed out after this cell is quite long because this is actually a Spark job SageMaker runs. Skip through all the Hadoop/Yarn stuff we focus on Constraints which is the thing we want from the monitor.

SageMaker monitor constraints

Then there is the stats part in the log. It has some summary statistics of each feature.

This particular processing job takes about 6-8 minutes to run.

The log is very long and hard to read. To better read it we can load the json files into pandas dataframes.

S3Downloader.list(f's3://{BUCKET}/{baseline_results_prefix}')
"""
['s3://<bucketname>/churn/baselining/results/constraints.json',
 's3://<bucketname>/churn/baselining/results/statistics.json']
"""

constraints_df = pd.json_normalize(
    my_default_monitor.suggested_constraints().body_dict["features"])
constraints_df.head(5)
"""
name	inferred_type	completeness	num_constraints.is_non_negative
0	Churn	Integral	1.0	True
1	Account Length	Integral	1.0	True
2	VMail Message	Integral	1.0	True
3	Day Mins	Fractional	1.0	True
4	Day Calls	Integral	1.0	True
"""

The statistics.json file contains statistical information about the data in the baseline. We can view these constraints by calling the baseline_statistics method.

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.json_normalize(my_default_monitor.baseline_statistics().body_dict["features"])
schema_df.head(5)

"""
name	inferred_type	numerical_statistics.common.num_present	numerical_statistics.common.num_missing	numerical_statistics.mean	numerical_statistics.sum	numerical_statistics.std_dev	numerical_statistics.min	numerical_statistics.max	numerical_statistics.distribution.kll.buckets	numerical_statistics.distribution.kll.sketch.parameters.c	numerical_statistics.distribution.kll.sketch.parameters.k	numerical_statistics.distribution.kll.sketch.data
0	Churn	Integral	2333	0	0.139306	325.0	0.346265	0.0	1.0	[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...	0.64	2048.0	[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0,...
"""

The above is just to show what columns (statistics) are available.

The most important columns that will be used to detect drift are numerical_statistics.distribution.kll.<something>. It is a compressed way to describe the distribution.

6.3 Creating a monitoring schedule

Now we have the statistics to monitor drift, we can setup a periodic schedule to run the check.

Note that it takes at least 1 hour to run the code in this section. It’s because the most frequent rate for the SageMaker monitoring job is hourly.

To create the monitoring schedule, we use create_monitoring_schedule method in the default monitoring object. We specify the endpoint, the desired S3 location for the report which captures the violations in live data given the constraints, then we pass in statistics and constraints in the default monitor object, the desired schedule name, and a cron expression that’s generated by SageMaker SDK, and CloudWatch metrics for visualization later.

from time import gmtime, strftime

reports_prefix = f'{PREFIX}/reports'
s3_report_path = f's3://{BUCKET}/{reports_prefix}'

mon_schedule_name = 'xgb-churn-model-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

my_default_monitor.create_monitoring_schedule(
    endpoint_input=churn_predictor.endpoint,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    monitor_schedule_name=mon_schedule_name,
    schedule_cron_expression=model_monitor.CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True)

We can use a background thread to mimic live data.

If there’s no live data coming into the endpoint, the monitoring job will fail later on.

from threading import Thread
from time import sleep
import time

runtime_client = boto3.client('runtime.sagemaker')

# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, 'r') as f:
        for row in f:
            payload = row.rstrip('\n')
            response = runtime_client.invoke_endpoint(EndpointName=ep_name,
                                                      ContentType='text/csv',
                                                      Body=payload)
            time.sleep(10)

def invoke_endpoint_forever():
    while True:
        invoke_endpoint(churn_model.endpoint_name, f'{LOCAL_DATA_DIRECTORY}/test-dataset-input-cols.csv', runtime_client)

thread = Thread(target = invoke_endpoint_forever)
thread.start()

Note that you need to stop the kernel to stop the endpoint invocations by the thread.

Now we can wait for the first monitoring output. The first one will come in on the hour.

mon_executions = my_default_monitor.list_executions()
if len(mon_executions) == 0:
    print("We created a hourly schedule above and it will kick off executions ON the hour.\n"
        "We will have to wait till we hit the hour...\n\n")

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = my_default_monitor.list_executions()

If we go to the GUI and click on Endpoints in the left sidebar, right click on the endpoint,

6.4 Visualizing drift by comparing data distributions

After the monitoring job run, we see it produces 3 files:

  • constraint_violations.json
  • constraints.json
  • statistics.json

We can download them from S3, or put them into a dataframe like this:

violations = my_default_monitor.latest_monitoring_constraint_violations()
constraints_df = pd.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

"""
feature_name	constraint_check_type	description
0	State_OR	data_type_check	Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.30555555555556% of data is Integral.
1	State_WV	data_type_check	Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.30555555555556% of data is Integral.
2	State_UT	data_type_check	Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.30555555555556% of data is Integral.
3	State_SC	data_type_check	Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.30555555555556% of data is Integral.
...
"""

latest_monitoring_violations = my_default_monitor.latest_monitoring_constraint_violations()
latest_monitoring_statistics = my_default_monitor.latest_monitoring_statistics()

Next, we get Execution and Baseline details from Processing Job Arn.

Enter the ProcessingJob arn for an execution of a MonitoringSchedule below to get the result files associated with that execution

processing_job_arn = latest_execution.describe()['ProcessingJobArn']

import os

from sagemaker.model_monitor import MonitoringExecution

execution = MonitoringExecution.from_processing_arn(
    sagemaker_session=sagemaker_session,
    processing_job_arn=processing_job_arn)

exec_inputs = {inp['InputName']: inp for inp in execution.describe()['ProcessingInputs']}
exec_results = execution.output.destination

baseline_statistics_filepath = exec_inputs['baseline']['S3Input']['S3Uri'] if 'baseline' in exec_inputs else None
execution_statistics_filepath = os.path.join(exec_results, 'statistics.json')
violations_filepath = os.path.join(exec_results, 'constraint_violations.json')

baseline_statistics = json.loads(S3Downloader.read_file(baseline_statistics_filepath)) if baseline_statistics_filepath is not None else None
execution_statistics = json.loads(S3Downloader.read_file(execution_statistics_filepath))
violations = json.loads(S3Downloader.read_file(violations_filepath))['violations']

We need to download a utility file from SageMaker to plot the distributions:

!wget https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/master/sagemaker_model_monitor/visualization/utils.py

import utils as mu

mu.show_violation_df(baseline_statistics=baseline_statistics,
                     latest_statistics=execution_statistics,
                     violations=violations)

SageMaker infers the data types from the baseline dataset we provided. It knows what features are integral and which are fractional.

We can also look at the summary statistics of each feature:

features = mu.get_features(execution_statistics)
feature_baselines = mu.get_features(baseline_statistics)
mu.show_distributions(features)

"""
	num_present	num_missing	mean	sum	std_dev	min	max
Churn	288	0	0.121015	34.85218	0.246669	0.002989	0.993593
Account Length	288	0	99.760417	28731.00000	40.589513	10.000000	232.000000
VMail Message	288	0	6.702083	1930.20000	12.276850	0.000000	43.000000
Day Mins	288	0	178.247222	51335.20000	54.363873	46.500000	328.100000
...

<IT ALSO PLOTS THE DISTRIBUTION FOR EACH FEATURE>
"""

A very handy feature is to plot the live data distribution against baseline:

mu.show_distributions(features, feature_baselines)

Live vs Baseline

We can quickly see some of the baseline data are integers and some from the the live data are fractions. Overlaying them together gives us a great view to look for drift.

Note that there are other useful information and visualization available in

  • “Describe endpoint” in sidebar’s endpoint tab
  • SageMaker console -> Inference -> Endpoints

Check them out as well!

6.5 Delete resources

If we don’t delete the resources created, we will get charged. We now have 3 kinds of resources:

  • endpoint
  • model
  • monitoring schedule job

To delete all of them:

sagemaker_session.delete_monitoring_schedule(mon_schedule_name)
sagemaker_session.delete_endpoint(churn_model.endpoint_name)
sagemaker_session.delete_model(churn_model.name)

Other SageMaker features

  • SageMaker Ground Truth: build and manage training datasets, like Figure 8
  • SageMaker Autopilot: build foolproof models for us
  • SageMaker Neo: deploy models on different hardware
  • SageMaker Marketplace: buy and sell model packages

Reference

  • Luigi’s SageMaker course