AWS for Games Blog

Generate custom game events from Unreal Engine with the Game Analytics Pipeline

Today’s game developers use analytics as a critical workload to deliver the best gameplay experiences possible for their players. As developers look to have more control over where their data is stored and to reduce the complexities of ingesting data from a variety of sources, many turn to AWS to help them create their own custom analytics pipelines and data lakes. While this data can be generated by any producer, from game servers to marketing services to publishing platforms, the most critical producer of data is player data generated by active users of a game on the game client and builds.

This blog post is a step-by-step tutorial that details how to ingest data from games developed in the Unreal game engine using the AWS C++ SDKs with the one-click deployable Game Analytics Pipeline solution as a first step to setting up your own custom game analytics pipeline.

With this solution, as documented in our blog post “Ingest and Visualize Streaming Data for Games,” there are two approaches to ingesting telemetry data:

  1. Proxy integration with the solution API events endpoint: Choose this option if you require custom REST proxy integration for ingestion of game events and prefer to not include information about your KDS stream on the client. Applications send events to the events endpoint that synchronously proxies the request to KDS and returns a response to the client. This option provides more control since you are able to leverage security services for better edge protection, such as AWS WAF and AWS Shield.
  2. Direct integration with Amazon Kinesis Data Streams: Choose this option if you want to publish events from your games and services directly to Amazon Kinesis Data Streams (KDS) without the solution’s API Gateway. This is useful if you are a game developer new to AWS real-time streaming analytics but familiar with C++ libraries, or if you prefer to not manage API Gateway as part of your deployment. This also removes the added cost associated with using API Gateway.

Both of these methods can be used for cross-platform game releases including mobile games, PC games, and console games. This blog post specifically focuses on the first type of ingestion: direct integration with KDS using the AWS C++ SDKs.

This blog post contains many sections on concepts and architecture that are similar or identical to those in a past post about Unity, which can be found here. However, the code integrations and engine settings in this blog post have a specific focus on Unreal Engine and AWS C++ SDK, as opposed to the Unity Engine with AWS .NET SDK.

Disclaimer: The code in this blog is meant for tutorial purposes only and is not production-ready code.

Requirements

Ensure you have access to the following before starting this tutorial:

  • An AWS account
  • The Game Analytics Pipeline solution deployed in your AWS account – follow the Deployment Guide for more information
  • Unreal Engine 4 or higher (Unreal Engine 5 is supported) – this solution has not been validated using lower versions of Unreal
  • Integrating the AWS C++ SDK through completion of this blog post
  • Intermediate level knowledge of C++ and Unreal Engine

Initial Setup

Continuing from setting up the AWS C++ SDK with Unreal Engine, we will first be adding the “Json” and “JsonUtilities” modules to the project build file.

  • Navigate to your Project’s Build.cs file (should be in a location like: [ProjectName]\Source\[ProjectName]\[ProjectName].Build.cs). Add the Json and JsonUtilities Modules you created earlier as a dependency (You will see below I added “Json” and “JsonUtilities” in the list).

ExampleProject.Build.cs

PublicDependencyModuleNames.AddRange(new string[] { "Core", "CoreUObject", "Engine", "InputCore", "AWSSDK", "Json", "JsonUtilities" });//...
//...
bEnableUndefinedIdentifierWarnings = false;

Setup an Amazon Cognito Managed Identity Pool

Next, create a Managed Identity Pool using Amazon Cognito. This allows users to assume a role-based identity using an Identity & Access Management policy and put records into your Kinesis stream.

  1. Get started in Amazon Cognito by selecting Manage Identity Pools, then select Create New Identity Pool.
  2. Get started in Amazon Cognito by selecting Manage Identity Pools, then select Create New Identity Pool.
  3. Enter in an Identity pool name and make sure to check Enable access to unauthenticated identities. Then select Create Pool. Unauthenticated identities are anonymous users who you will allow to put records into your stream.

A note about unique identifiers
If you wish to collect user data such as unique identifiers or session state, or add user customized features, you’ll need to add authentication into the client using both the Amazon Cognito SDKs and Amazon Cognito User Pools. For the purposes of this blog, the use case focuses on gathering data from anonymous users, which requires the use of Amazon Cognito Unauthenticated Identities.

If you wish to collect user data such as unique identifiers or session state, or add user customized features, you'll need to add authentication into the client using both the Amazon Cognito SDKs and Amazon Cognito User Pools. For the purposes of this blog, the use case focuses on gathering data from anonymous users, which requires the use of Amazon Cognito Unauthenticated Identities.

  1. You will be prompted to create a new IAM role that users in this unauthenticated identity pool can assume. Click the Show Policy document drop down.
  2. If you are new to AWS, the default role allows cognito-sync and PutEvents, but has no resources that it can act on. As this is not what we are using, the role must be edited for your records to be put into the stream and to remove access for services you are not using.

Instead of the default, your role should match the following snippet to work with this tutorial. Ideally, permissions for this Role should be minimal since it is intended for guest access:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": [
                "COPY-ARN-FOR-THE-SOLUTION-STREAM-HERE"
            ]
        }
    ]
}

This role will allow you to call both PutRecord and PutRecords on the Kinesis Stream resource. Make sure to copy the ARN for the Kinesis Stream that was created by the one-click deployable solution. Once you’ve adjusted the role, click Allow.

  1. You will find sample code under Get AWS Credentials. Copy the ID next to “// Identity Pool ID” in prep for the next step.

Note: The bottom of the blog will have the full, completed script to help while you follow along the below steps

Modify your script to add Cognito Identity Pools

First we will add code changes to the MyActor header and cpp files to connect to an AWS Cognito Identity Pool, retrieve an AWS Access Key and Secret Key, and set as an ephemeral variable for the AWS SDK Kinesis Client that we create in the later steps.

  1. Append your MyActor.h file’s header portion with the following:
  2. //...Other #includes above
    #include <aws/core/Aws.h>
    #include <aws/core/auth/AWSCredentialsProvider.h>
    #include <aws/core/auth/AWSCredentials.h>
    #include <aws/core/utils/threading/Executor.h>
    #include <aws/cognito-identity/CognitoIdentityClient.h>
    #include <aws/cognito-identity/model/GetIdRequest.h>
    #include <aws/cognito-identity/model/GetCredentialsForIdentityRequest.h>
    // We will include more libraries here in the following steps
    #include "MyActor.generated.h"
    //...Rest of #includes, make sure .generated.h is the last one
  3. Add the following variable and function declarations to the MyActor.h file:
  4. //...MyActor constructor, BeginPlay, other functions are above
    
    private:
        // AWS SDK Configuration Variables
        
        static Aws::String AWS_ACCOUNT_ID;
    
        static Aws::String AWS_REGION;
    
        static Aws::String COGNITO_IDENTITY_POOL_ID;
        
        // We will include more variables here in the following steps
    
        // AWS SDK Non-Configuration Variables
    
        Aws::SDKOptions options;
    
        Aws::Auth::AWSCredentials *credentials;
        
        // We will include more variables and function declarations here in the following steps
  5. Modify your MyActor.cpp file with the following code:
  6. // #Includes are above
    
    // Account ID
    Aws::String AMyActor::AWS_ACCOUNT_ID = "INSERT_YOUR_AWS_ACCOUNT_ID_HERE";
    
    // Region
    Aws::String AMyActor::AWS_REGION = Aws::Region::INSERT_YOUR_AWS_REGION_HERE;
    
    // Cognito Identity Pool
    Aws::String AMyActor::COGNITO_IDENTITY_POOL_ID = "INSERT_THE_COGNITO_IDENTITY_POOL_ID_HERE";
    
    // We will be adding more global variables here in later steps
    
    // Other global variables and functions below
  7. Replace the variable’s values with the corresponding:
    1. AWS_ACCOUNT_ID: The AWS Account ID that the Game Analytics Pipeline and Cognito Identity Pool are in. Example: 912345678901
    2. AWS_REGION: The AWS Region that the Game Analytics Pipeline and Cognito Identity Pool are in. The format must follow as the AWS C++ SDK documentation states. Example: US_EAST_1.
    3. COGNITO_IDENTITY_POOL_ID: The Cognito Identity Pool ID you retrieved from step 6 under the “Setup an Amazon Cognito Managed Identity Pool“ section. Example: us-east-1:234a5b67-8901-2a3b-4567-c89012d34e56
  8. Replace the BeginPlay function in MyActor.cpp:
  9. void AMyActor::BeginPlay()
    {
        // Call the base class
        Super::BeginPlay();
    
        // Initialize AWS SDK API
        Aws::InitAPI(options);
    
        // Set AWS Client Configuration
        Aws::Client::ClientConfiguration clientConfig;
        clientConfig.region = AWS_REGION;
    
        // Grab AWS Cognito Identity Pool information
        Aws::String identityId;
        std::shared_ptr<Aws::CognitoIdentity::CognitoIdentityClient> cognitoIdentityClient = Aws::MakeShared<Aws::CognitoIdentity::CognitoIdentityClient>("CognitoIdentityClient", clientConfig);
    
        // Create and send request to Cognito to retrieve identity
        Aws::CognitoIdentity::Model::GetIdRequest getIdRequest;
        getIdRequest.SetAccountId(AWS_ACCOUNT_ID);
        getIdRequest.SetIdentityPoolId(COGNITO_IDENTITY_POOL_ID);
        Aws::CognitoIdentity::Model::GetIdOutcome getIdOutcome{cognitoIdentityClient->GetId(getIdRequest)};
    
        // If request to Cognito Identity Pool is success, retrieves the Access Key
        if (getIdOutcome.IsSuccess())
        {
            Aws::CognitoIdentity::Model::GetIdResult getIdResult{getIdOutcome.GetResult()};
            identityId = getIdResult.GetIdentityId();
        }
        // Uses Access Key to request and retrieve Secret Access Key
        Aws::CognitoIdentity::Model::GetCredentialsForIdentityRequest getCredsRequest;
        getCredsRequest.SetIdentityId(identityId);
        Aws::CognitoIdentity::Model::GetCredentialsForIdentityOutcome getCredsOutcome{cognitoIdentityClient->GetCredentialsForIdentity(getCredsRequest)};
        Aws::CognitoIdentity::Model::Credentials cognitoCredentials;
        if (getCredsOutcome.IsSuccess())
        {
            Aws::CognitoIdentity::Model::GetCredentialsForIdentityResult getCredsResult{getCredsOutcome.GetResult()};
            cognitoCredentials = getCredsResult.GetCredentials();
        }
        // Sets AWS Credentials based on Cognito Credentials
        credentials = new Aws::Auth::AWSCredentials(cognitoCredentials.GetAccessKeyId(), cognitoCredentials.GetSecretKey(), cognitoCredentials.GetSessionToken());
    
        // Optionally Print out the credentials information for debugging
        /*
        FString creds(credentials->GetAWSAccessKeyId().c_str());
        UE_LOG(LogTemp, Warning, TEXT("Credentials Access Key Id: %s"), *creds);
        FString credsS(credentials->GetAWSSecretKey().c_str());
        UE_LOG(LogTemp, Warning, TEXT("Credentials Secret Key: %s"), *credsS);
        FString credsX(credentials->GetSessionToken().c_str());
        UE_LOG(LogTemp, Warning, TEXT("Credentials Session Token: %s"), *credsX);
        */
    
        // Shut down the AWS SDK API, will be started back once records are being sent to Kinesis
        Aws::ShutdownAPI(options);
    }
  10. Note the comments and authentication process, and try running the script with the actor in your scene to verify functionality (and optionally uncomment the print statements to see the credential information as well)

Modify your script to handle your records, batching, and ingestion

Next we append to the MyActor header and cpp files to set functionality that, upon trigger, will create event records, combine into a batch, and at a certain point send the event to the Kinesis Data Stream using the AWS C++ SDK.

  1. Append your MyActor.h file’s header portion with the following:
  2. //...Other #includes above
    #include <aws/core/Aws.h>
    #include <aws/core/auth/AWSCredentialsProvider.h>
    #include <aws/core/auth/AWSCredentials.h>
    #include <aws/core/utils/threading/Executor.h>
    #include <aws/cognito-identity/CognitoIdentityClient.h>
    #include <aws/cognito-identity/model/GetIdRequest.h>
    #include <aws/cognito-identity/model/GetCredentialsForIdentityRequest.h>
    
    //...INSERT NEW INCLUDES BELOW:
    
    #include <aws/kinesis/KinesisClient.h>
    #include <aws/kinesis/model/DescribeStreamRequest.h>
    #include <aws/kinesis/model/DescribeStreamResult.h>
    #include <aws/kinesis/model/GetRecordsRequest.h>
    #include <aws/kinesis/model/GetRecordsResult.h>
    #include <aws/kinesis/model/GetShardIteratorRequest.h>
    #include <aws/kinesis/model/GetShardIteratorResult.h>
    #include <aws/kinesis/model/Shard.h>
    #include <aws/kinesis/model/PutRecordsResult.h>
    #include <aws/kinesis/model/PutRecordsRequest.h>
    #include <aws/kinesis/model/PutRecordsRequestEntry.h>
    #include "MyActor.generated.h"
    //...Rest of #includes, make sure .generated.h is the last one
  3. Add the following variable and function declarations to the MyActor.h file:
  4. //...AMyActor constructor, BeginPlay, other functions are above
    
    private:
        // AWS SDK Configuration Variables
        
        static Aws::String AWS_ACCOUNT_ID;
    
        static Aws::String AWS_REGION;
    
        static Aws::String COGNITO_IDENTITY_POOL_ID;
        
        // ...
        // ADDING NEW VARIABLES BELOW:
        // ...
    
        static FString GAP_APPLICATION_ID;
    
        static FString KINESIS_STREAM_NAME;
    
        static int BATCH_SIZE;
    
        // AWS SDK Non-Configuration Variables
    
        Aws::SDKOptions options;
    
        Aws::Auth::AWSCredentials *credentials;
        
        // ...
        // ADDING NEW VARIABLES/FUNCTIONS BELOW:
        // ...
        
         Aws::Kinesis::KinesisClient *kinesisClient;
         
        // Raw Event Records
    
        static TArray<TSharedPtr<FJsonObject>> raw_records;
        
        // Functions
    
        void CreateGameOverEvent(int8 wins, int8 losses);
    
        void CreateRecord(TSharedPtr<FJsonObject> event_data, FString event_name);
    
        void GenerateBatch(TSharedPtr<FJsonObject> record, FString partitionKey);
    
        void PutRecords(TArray<TSharedPtr<FJsonObject>> records, FString partitionKey);
    
        void OnPutRecordsAsyncOutcomeReceived(const Aws::Kinesis::KinesisClient *, const Aws::Kinesis::Model::PutRecordsRequest &, const Aws::Kinesis::Model::PutRecordsOutcome &outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext> &);
    
        FString FormatUUID(FString uuid);
    
  5. Modify your MyActor.cpp file with the following code:
  6. // #Includes are above
    
    // Account ID
    Aws::String AMyActor::AWS_ACCOUNT_ID = "INSERT_YOUR_AWS_ACCOUNT_ID_HERE";
    
    // Region
    Aws::String AMyActor::AWS_REGION = Aws::Region::INSERT_YOUR_AWS_REGION_HERE;
    
    // Cognito Identity Pool
    Aws::String AMyActor::COGNITO_IDENTITY_POOL_ID = "INSERT_THE_COGNITO_IDENTITY_POOL_ID_HERE";
    
    // ...
    // ADDING NEW VARIABLES BELOW:
    // ...
    
    // Application ID from the Solution
    FString AMyActor::GAP_APPLICATION_ID = "INSERT_GAP_APPLICATION_ID_HERE";
    
    // Kinesis Stream Name
    FString AMyActor::KINESIS_STREAM_NAME = "INSERT_KINESIS_STREAM_ID_HERE";
    
    // The number of records collected before a batch is sent to Amazon Kinesis
    // Streams. In production this should be much higher, but for this demo
    // script it is set to 4
    int AMyActor::BATCH_SIZE = 4;
    
    // A list that holds our records to batch them
    TArray<TSharedPtr<FJsonObject>> AMyActor::raw_records;
    
    // Other global variables and functions below
  7. Replace the variable’s values with the corresponding:
    1. GAP_APPLICATION_ID: The Application ID sent to the Game Analytics Pipeline, which tracks a UUID for the application in the case of multiple games/applications. To find your application ID, in your CloudFormation’s stack for the one-click deployable solution, click Outputs and search for TestApplicationId. Example: 234a5b67-8901-2a3b-4567-c89012d34e56
    2. KINESIS_STREAM_NAME: The name of the Kinesis stream that the events will be sent to. To find your stream name, in your CloudFormation’s stack for the one-click deployable solution, click Outputs and search for GameEventsStream. Example: GAP-stack-GameEventsStream-1ABCd2EfghIj.
  8. Now we will add the primary portion of the script. The Game Analytics Pipeline requires a specific schema in order for records to be added to the stream correctly, which is broken down in the following general format:
{
    "application_id": "234a5b67-8901-2a3b-4567-c89012d34e56", // Game Analytics Pipeline Application ID
    "event": {
        "event_id": "987c5a65-4321-0g9a-8765-b43219a87b65", // Random generated UUID
        "event_type": "gameover", // Custom game event type, can be any general category of events that several event names can be under
        "event_name": "gameover", // Custom game event name, can be the same as event type if there is no general category
        "event_timestamp": 3829729423, // Unix-converted UTC timestamp of event date
        "event_version": "1.0.0", // Version of the event schema in case the structure changes over time due to new releases or changes
        "app_version": "1.0.0", // Version of the application/game, to allocate for multiple game release versions
        "event_data": {
            "wins": 1, // Custom data for event, in this case a game over event where player wins 1 game
            "losses": 0,
            "platform": "UnrealEditor"
        }
    }
 }
  1. Our code will do the following:
    1. The CreateGameOverEvent function will create the innermost nested JSON, the “event_data” portion, based on the parameters passed in when we trigger this function. The “event_data” can be completely customized to meet your ideal event parameters. Then calls the below function.
    2. The CreateRecord function will grab the “event_data” and both encapsulate and enrich the data with the other parameters, such as “event_id”, “event_name”, “event_timestamp”, so that the event can be properly partitioned and be queried. Then calls the below function.
    3. The GenerateBatch function will grab the above data and encapsulate with a top-level “event” parameter, and the “application_id” set with the Game Analytics Pipeline Application ID, append to a global array holding the batch of events, and once it reaches a certain size, package and send to Kinesis, which would call the below function.
    4. The PutRecords function will grab the current filled batch of events, iterate through the batch, and for each event in the batch serialize the data to a string, then into byte data, then into a Kinesis PutRecords Request Entry, then batched back into a PutRecords Request, which will then be the data sent in a PutRecordsAsync call, which will use a memory stream to Base 64 encode the data, then send it asynchronously to the Kinesis Data Stream. A callback is added to the async call to trigger when we get a response, which would call the below function.
    5. The OnPutRecordsAsyncOutcomeReceived function is optional, but will execute actions upon a response from Kinesis. In this case, the actions are to either print a success or an error message based on the Kinesis response.
    6. We also have a UUID helper function to help convert Unreal Engine’s GUID object to a universal UUID format. Unreal Engine has a format for GUIDs that look like the following: “234A5B6789012A3B4567C89012D34E56”. The helper function will convert to lower case and add dashes to look like the following: “234a5b67-8901-2a3b-4567-c89012d34e56”.

A note about batching

Batching your records before sending them to your Amazon Kinesis stream will enable you to call fewer PutRecords requests and is both efficient and a way to cost optimize your communication. In the sample above, the batch size is set to 4 to allow you to get it working, but your game in production should be set higher. Each PutRecords request can support up to 500 records and reach record can be as large as 1 MB up to a limit of 5 MB for the entire request. For more information about Kinesis Streams quotas and limits visit the documentation here.

Other considerations

The previous script does not handle retries or situations where players have backgrounded or closed the app, but the batch has not been sent. Before pushing to production, developers should write additional logic to handle these cases.

  1. Append the following functions to the MyActor.cpp file:
// Generates a win or loss event, encapsulates in json object as a game event, and sends to CreateRecord to add other event data and encapsulate as a full event record.
// For example, an event sent to GAP has top-level fields showing data like event timestamp, event name, etc. But the nested JSON will have the actual data specific to the event, such as GameOver event data.
void AMyActor::CreateGameOverEvent(int8 wins, int8 losses)
{
    // Create new JSON object
    TSharedPtr<FJsonObject> eventData = MakeShareable(new FJsonObject);
    // Set event data fields in JSON object
    eventData->SetNumberField("wins", wins);
    eventData->SetNumberField("losses", losses);
    eventData->SetStringField("platform", "UnrealEditor");
    // Create record with event data
    CreateRecord(eventData, "gameover");
}

// Create Record enriches event data with additional parameters as JSON object
void AMyActor::CreateRecord(TSharedPtr<FJsonObject> event_data, FString event_name)
{
    // Generating UUID using Unreal's UUID type. This is then converted to a format compatible with GAP.
    FString event_id = FGuid::NewGuid().ToString();
    // Grabbing current unix timestamp time in seconds
    int64 current_time = (int64)FDateTime::UtcNow().ToUnixTimestamp();
    // Create new JSON object
    TSharedPtr<FJsonObject> record = MakeShareable(new FJsonObject);
    // Set event data fields in JSON object
    record->SetStringField("event_id", FormatUUID(event_id));
    record->SetStringField("event_type", event_name);
    record->SetStringField("event_name", event_name);
    record->SetNumberField("event_timestamp", static_cast<double>(current_time));
    record->SetStringField("event_version", "1.0.0");
    record->SetStringField("app_version", "1.0.0");
    record->SetObjectField("event_data", event_data);
    // Add to the Batch of Records
    GenerateBatch(record, event_id);
}

void AMyActor::GenerateBatch(TSharedPtr<FJsonObject> record, FString partitionKey)
{
    // Append Raw Records with new Record
    // Create new JSON object
    TSharedPtr<FJsonObject> wrappedRecord = MakeShareable(new FJsonObject);
    // Set event data fields in JSON object
    wrappedRecord->SetObjectField("event", record);
    wrappedRecord->SetStringField("application_id", GAP_APPLICATION_ID);
    // Add to array of raw records
    raw_records.Add(wrappedRecord);
    // Debug message showing a record added to list with total list size
    UE_LOG(LogTemp, Warning, TEXT("Added record to list: %s"), *FString::FromInt(raw_records.Num()));
    // Once total list size reaches the batch size it will be sent to Kinesis
    if (raw_records.Num() >= BATCH_SIZE)
    {
        // Call Put Record
        PutRecords(raw_records, partitionKey);

        // Clears raw records after they are sent for demo.
        // In production, change to only clear on successful response.
        raw_records.Empty();
    }
}

// Puts a batch of records into Kinesis
void AMyActor::PutRecords(TArray<TSharedPtr<FJsonObject>> records, FString partitionKey)
{
    // Initialize AWS SDK API
    Aws::InitAPI(options);

    // Set AWS Client Configuration
    Aws::Client::ClientConfiguration clientConfig;
    clientConfig.region = AWS_REGION;

    // Initializes Kinesis Client
    kinesisClient = new Aws::Kinesis::KinesisClient(*credentials, clientConfig);

    // Set Kinesis Put Request information
    Aws::Kinesis::Model::PutRecordsRequest putRecordsRequest;
    const Aws::String awsStreamName(TCHAR_TO_UTF8(*KINESIS_STREAM_NAME));
    putRecordsRequest.SetStreamName(awsStreamName);

    // Iterate through batch records and add into Kinesis Put Records Request
    for (int8 i = records.Num() - 1; i >= 0; --i)
    {
        // Sets an AWS Kinesis single Put Request entry, which is encapsulated together into a batch to be sent in a single request
        Aws::Kinesis::Model::PutRecordsRequestEntry putRecordsRequestEntry;

        // Unreal Engine's Json Writer will serialize the data then convert the JSON object into this FString
        FString Result;
        TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Result);
        FJsonSerializer::Serialize(records[i].ToSharedRef(), Writer);

        // Optional Logging to show how the JSON format looks like before sent to Kinesis
        // UE_LOG(LogTemp, Warning, TEXT("RESULT STRING JSON: %s"), *Result);

        // FString is then converted to AWS StringStream for compatibility
        std::string const resultToString = TCHAR_TO_UTF8(*Result);
        Aws::StringStream data;
        data << resultToString;

        // AWS StringStream is converted into byte data to send to Kinesis
        Aws::Utils::ByteBuffer bytes((unsigned char *)resultToString.c_str(), resultToString.length());

        // Sets the single Put Records entry with data and partition key
        putRecordsRequestEntry.WithData(bytes).WithPartitionKey(TCHAR_TO_UTF8(*partitionKey));

        // Pushes the single Put Records entry to a batch of put records
        putRecordsRequest.AddRecords(putRecordsRequestEntry);
    }
    // Optional log showing that data formatting is completed successfully and now sending request to Kinesis
    UE_LOG(LogTemp, Warning, TEXT("Sending request to Kinesis"));
    // Sending an asynchronous request to Kinesis to put the batch of records
    kinesisClient->PutRecordsAsync(putRecordsRequest, std::bind(&AMyActor::OnPutRecordsAsyncOutcomeReceived, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
}

// Delegate called when the request to Kinesis returns a response
void AMyActor::OnPutRecordsAsyncOutcomeReceived(const Aws::Kinesis::KinesisClient *, const Aws::Kinesis::Model::PutRecordsRequest &, const Aws::Kinesis::Model::PutRecordsOutcome &outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext> &)
{
    // Called when the Kinesis Data Stream accepted the data
    if (outcome.IsSuccess())
    {
        UE_LOG(LogTemp, Warning, TEXT("Successfully sent to Kinesis!"));
    }
    // Called when the Kinesis Data Stream request returns an error, along with the message
    else
    {
        Aws::String message = outcome.GetError().GetMessage();
        FString fMessage = UTF8_TO_TCHAR(message.c_str());
        UE_LOG(LogTemp, Warning, TEXT("ERROR: Could not send to Kinesis. Reason: %s"), *fMessage);
    }
    // Shuts down the AWS API now that it is no longer used
    Aws::ShutdownAPI(options);
}

// Helper function to format Unreal Engine's UUID format to a format accepted by GAP
FString AMyActor::FormatUUID(FString uuid)
{
    return (uuid.Mid(0, 8).ToLower() + "-" + uuid.Mid(8, 4).ToLower() + "-" + uuid.Mid(12, 4).ToLower() + "-" + uuid.Mid(16, 4).ToLower() + "-" + uuid.Mid(20, 12).ToLower());
}

Add key inputs to trigger game events

Now that we have created the game event and the event generation, batching, and ingestion process, we need to find a way to trigger the process and watch it in action. Somewhere else in your game, either tied to a button or specific action, you would call “Game Over” and pass wins and losses to the event.

In the below example, it is assumed you are using an example keybind to send your events and call “Game Over” in a “character”. To achieve this without creating a new character script and calling MyActor, you can temporarily change MyActor to a character subclass to allow keybind actions by:

  • Changing #include “GameFramework/Actor.h” in the MyActor.h header file to #include “GameFramework/Character.h”
  • Changing “MyActor : public AActor” in the class declaration to “MyActor : public ACharacter”.
  • Adding #include “Components/InputComponent.h” and #include “GameFramework/InputSettings.h” to the MyActor.cpp file

Refer to the full code snippets at the very bottom of the blog for guidance on code integration.

  1. Append the following class and function to the “MyActor.h” header file:
//... #includes above

class UInputComponent;

//... Class declaration, constructor declaration, and other function/variable declarations below
//... Other functions above

protected:
    virtual void SetupPlayerInputComponent(UInputComponent *InputComponent) override;
  1. Append the following function to the “MyActor.cpp” file:
//... Other functions above

void AMyActor::SetupPlayerInputComponent(class UInputComponent *PlayerInputComponent)
{
    // Set up gameplay key bindings
    check(PlayerInputComponent);

    // Create delegates to enter two parameters into event generator function
    DECLARE_DELEGATE_TwoParams(winParam, int8, int8);
    DECLARE_DELEGATE_TwoParams(loseParam, int8, int8);

    // Respond when our "Win" key is released.
    PlayerInputComponent->BindAction<winParam>("WinGame", IE_Released, this, &AMyActor::CreateGameOverEvent, (int8)1, (int8)0);
    // Respond when our "Lose" key is released.
    PlayerInputComponent->BindAction<loseParam>("LoseGame", IE_Released, this, &AMyActor::CreateGameOverEvent, (int8)0, (int8)1);
}
  1. Note the keybind names in the BindAction function call are “WinGame” and “LoseGame”. To set the keybinds, in the Unreal Editor go to Edit → Project Settings → Input → Bindings → Action Mappings and add “WinGame” and “LoseGame” (case sensitive) with the respective keys. When starting the level, based on the batch size of 4, the keys will need to be pressed four times before it is put into the Kinesis Stream and then finally into S3.

Verify data ingestion by checking S3 & Athena

  1. Go to the AWS Management Console and search for Amazon S3.
  2. If your records are successfully put into the stream you should see records in your S3 bucket generated by the one-click deployable solution titled “[Cloudformation-name]-analyticsbucket-[random string of numbers/letters]”.
  3. If your records are successfully put into the stream you should see records in your S3 bucket generated by the one-click deployable solution titled “[Cloudformation-name]-analyticsbucket-[random string of numbers/letters]”.
  4. Under raw_events you should see partitions titled “year=2022” (or current year) followed by month and day folders. Diving into these, you hopefully will see a file that looks like this:
  5. Under raw_events you should see partitions titled “year=2022” (or current year) followed by month and day folders. Diving into these, you hopefully will see a file that looks like this:
  6. Now head over to Amazon Athena, which is an interactive query service that makes it easy to write and run ad-hoc queries on data stored in S3.
  7. Go to Saved queries, and run a sample query on the gameeventsdatabase that was generated by the one-click deployable solution (check the outputs tab in CloudFormation). The following example depicts this running on partition day 3 looking at the raw_events table.
  8. Go to Saved queries, and run a sample query on the gameeventsdatabase that was generated by the one-click deployable solution (check the outputs tab in CloudFormation). The following example depicts this running on partition day 3 looking at the raw_events table.
  9. If you are using a similar query above to the raw_events table and see your events results show for the day that you sent these events, for example day = ‘03’, you’ve successfully put your records into the stream.

Troubleshooting

Running into 400 Bad Request errors? Schema mis-matches? Anything else? Check the following:

  • Unable to put records – Make sure your IAM Role, under Identity & Access Management, in AWS has both PutRecord and PutRecords as indicated in step 2 of the Setup an Amazon Cognito Managed Identity Pool section.
  • Namespace errors – Make sure you have all the required .dlls. When in doubt, refer back to the AWS C++ SDK with Unreal Engine blog post to see which AWS SDK is required for methods the script might be calling that are missing.
  • 400 Bad Request – This usually indicates the request was not accepted, which means something went wrong with PutRecordsAsync in Step 4 or your IAM role has incorrect permissions. Double check your Put_Records code and your IAM role for Amazon Cognito in AWS.

Next steps

Fantastic! Now that you’ve successfully ingested custom data into your game analytics pipeline and into your S3 data lake, you have a world of endless possibilities for your game analytics events. From here we recommend building out additional template events using the event_data parameters that meet your game’s specific tracking needs, investigating the full capabilities of our game analytics pipeline solution, or setting up your own QuickSight dashboard.

Full Code Reference

MyActor.h

#pragma once

#include "CoreMinimal.h"
#include "GameFramework/Character.h"
#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/utils/threading/Executor.h>
#include <aws/cognito-identity/CognitoIdentityClient.h>
#include <aws/cognito-identity/model/GetIdRequest.h>
#include <aws/cognito-identity/model/GetCredentialsForIdentityRequest.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/kinesis/model/DescribeStreamRequest.h>
#include <aws/kinesis/model/DescribeStreamResult.h>
#include <aws/kinesis/model/GetRecordsRequest.h>
#include <aws/kinesis/model/GetRecordsResult.h>
#include <aws/kinesis/model/GetShardIteratorRequest.h>
#include <aws/kinesis/model/GetShardIteratorResult.h>
#include <aws/kinesis/model/Shard.h>
#include <aws/kinesis/model/PutRecordsResult.h>
#include <aws/kinesis/model/PutRecordsRequest.h>
#include <aws/kinesis/model/PutRecordsRequestEntry.h>
#include "MyActor.generated.h"

class UInputComponent;

UCLASS(config = Game)
class AMyActor : public ACharacter
{
    GENERATED_BODY()

public:
    AMyActor();

protected:
    virtual void BeginPlay();

protected:
    virtual void SetupPlayerInputComponent(UInputComponent *InputComponent) override;

private:
    // AWS SDK Configuration Variables
    static Aws::String AWS_ACCOUNT_ID;

    static Aws::String AWS_REGION;

    static Aws::String COGNITO_IDENTITY_POOL_ID;

    static FString GAP_APPLICATION_ID;

    static FString KINESIS_STREAM_NAME;

    static int BATCH_SIZE;

    // AWS SDK Non-Configuration Variables

    Aws::SDKOptions options;

    Aws::Auth::AWSCredentials *credentials;

    Aws::Kinesis::KinesisClient *kinesisClient;

    // Raw Event Records

    static TArray<TSharedPtr<FJsonObject>> raw_records;

    // Functions

    void CreateGameOverEvent(int8 wins, int8 losses);

    void CreateRecord(TSharedPtr<FJsonObject> event_data, FString event_name);

    void GenerateBatch(TSharedPtr<FJsonObject> record, FString partitionKey);

    void PutRecords(TArray<TSharedPtr<FJsonObject>> records, FString partitionKey);

    void OnPutRecordsAsyncOutcomeReceived(const Aws::Kinesis::KinesisClient *, const Aws::Kinesis::Model::PutRecordsRequest &, const Aws::Kinesis::Model::PutRecordsOutcome &outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext> &);

    FString FormatUUID(FString uuid);
};

MyActor.cpp

#include "MyActor.h"
#include "Components/InputComponent.h"
#include "GameFramework/InputSettings.h"

//////////////////////////////////////////////////////////////////////////
// MyActor

// Account ID
Aws::String AMyActor::AWS_ACCOUNT_ID = "INSERT_YOUR_AWS_ACCOUNT_ID_HERE";

// Region
Aws::String AMyActor::AWS_REGION = Aws::Region::INSERT_YOUR_AWS_REGION_HERE;

// Cognito Identity Pool
Aws::String AMyActor::COGNITO_IDENTITY_POOL_ID = "INSERT_THE_COGNITO_IDENTITY_POOL_ID_HERE";

// Application ID from the Solution
FString AMyActor::GAP_APPLICATION_ID = "INSERT_GAP_APPLICATION_ID_HERE";

// Kinesis Stream Name
FString AMyActor::KINESIS_STREAM_NAME = "INSERT_KINESIS_STREAM_ID_HERE";

// The number of records collected before a batch is sent to Amazon Kinesis
// Streams. In production this should be much higher, but for this demo
// script it is set to 4
int AMyActor::BATCH_SIZE = 4;

// A list that holds our records to batch them
TArray<TSharedPtr<FJsonObject>> AMyActor::raw_records;

// Example character constructor
AMyActor::AMyActor()
{
}

// Example character BeginPlay, connects to the cognito identity pool and retrieves AWS access keys
void AMyActor::BeginPlay()
{
    // Call the base class
    Super::BeginPlay();

    // Initialize AWS SDK API
    Aws::InitAPI(options);

    // Set AWS Client Configuration
    Aws::Client::ClientConfiguration clientConfig;
    clientConfig.region = AWS_REGION;

    // Grab AWS Cognito Identity Pool information
    Aws::String identityId;
    std::shared_ptr<Aws::CognitoIdentity::CognitoIdentityClient> cognitoIdentityClient = Aws::MakeShared<Aws::CognitoIdentity::CognitoIdentityClient>("CognitoIdentityClient", clientConfig);

    // Create and send request to Cognito to retrieve identity
    Aws::CognitoIdentity::Model::GetIdRequest getIdRequest;
    getIdRequest.SetAccountId(AWS_ACCOUNT_ID);
    getIdRequest.SetIdentityPoolId(COGNITO_IDENTITY_POOL_ID);
    Aws::CognitoIdentity::Model::GetIdOutcome getIdOutcome{cognitoIdentityClient->GetId(getIdRequest)};

    // If request to Cognito Identity Pool is success, retrieves the Access Key
    if (getIdOutcome.IsSuccess())
    {
        Aws::CognitoIdentity::Model::GetIdResult getIdResult{getIdOutcome.GetResult()};
        identityId = getIdResult.GetIdentityId();
    }
    // Uses Access Key to request and retrieve Secret Access Key
    Aws::CognitoIdentity::Model::GetCredentialsForIdentityRequest getCredsRequest;
    getCredsRequest.SetIdentityId(identityId);
    Aws::CognitoIdentity::Model::GetCredentialsForIdentityOutcome getCredsOutcome{cognitoIdentityClient->GetCredentialsForIdentity(getCredsRequest)};
    Aws::CognitoIdentity::Model::Credentials cognitoCredentials;
    if (getCredsOutcome.IsSuccess())
    {
        Aws::CognitoIdentity::Model::GetCredentialsForIdentityResult getCredsResult{getCredsOutcome.GetResult()};
        cognitoCredentials = getCredsResult.GetCredentials();
    }
    // Sets AWS Credentials based on Cognito Credentials
    credentials = new Aws::Auth::AWSCredentials(cognitoCredentials.GetAccessKeyId(), cognitoCredentials.GetSecretKey(), cognitoCredentials.GetSessionToken());

    // Optionally Print out the credentials information for debugging
    /*
    FString creds(credentials->GetAWSAccessKeyId().c_str());
    UE_LOG(LogTemp, Warning, TEXT("Credentials Access Key Id: %s"), *creds);
    FString credsS(credentials->GetAWSSecretKey().c_str());
    UE_LOG(LogTemp, Warning, TEXT("Credentials Secret Key: %s"), *credsS);
    FString credsX(credentials->GetSessionToken().c_str());
    UE_LOG(LogTemp, Warning, TEXT("Credentials Session Token: %s"), *credsX);
    */

    // Shut down the AWS SDK API, will be started back once records are being sent to Kinesis
    Aws::ShutdownAPI(options);
}

//////////////////////////////////////////////////////////////////////////// Input

void AMyActor::SetupPlayerInputComponent(class UInputComponent *PlayerInputComponent)
{
    // Set up gameplay key bindings
    check(PlayerInputComponent);

    // Create delegates to enter two parameters into event generator function
    DECLARE_DELEGATE_TwoParams(winParam, int8, int8);
    DECLARE_DELEGATE_TwoParams(loseParam, int8, int8);

    // Respond when our "Win" key is released.
    PlayerInputComponent->BindAction<winParam>("WinGame", IE_Released, this, &AMyActor::CreateGameOverEvent, (int8)1, (int8)0);
    // Respond when our "Lose" key is released.
    PlayerInputComponent->BindAction<loseParam>("LoseGame", IE_Released, this, &AMyActor::CreateGameOverEvent, (int8)0, (int8)1);
}

// Generates a win or loss event, encapsulates in json object as a game event, and sends to CreateRecord to add other event data and encapsulate as a full event record.
// For example, an event sent to GAP has top-level fields showing data like event timestamp, event name, etc. But the nested JSON will have the actual data specific to the event, such as GameOver event data.
void AMyActor::CreateGameOverEvent(int8 wins, int8 losses)
{
    // Create new JSON object
    TSharedPtr<FJsonObject> eventData = MakeShareable(new FJsonObject);
    // Set event data fields in JSON object
    eventData->SetNumberField("wins", wins);
    eventData->SetNumberField("losses", losses);
    eventData->SetStringField("platform", "UnrealEditor");
    // Create record with event data
    CreateRecord(eventData, "gameover");
}

// Create Record enriches event data with additional parameters as JSON object
void AMyActor::CreateRecord(TSharedPtr<FJsonObject> event_data, FString event_name)
{
    // Generating UUID using Unreal's UUID type. This is then converted to a format compatible with GAP.
    FString event_id = FGuid::NewGuid().ToString();
    // Grabbing current unix timestamp time in seconds
    int64 current_time = (int64)FDateTime::UtcNow().ToUnixTimestamp();
    // Create new JSON object
    TSharedPtr<FJsonObject> record = MakeShareable(new FJsonObject);
    // Set event data fields in JSON object
    record->SetStringField("event_id", FormatUUID(event_id));
    record->SetStringField("event_type", event_name);
    record->SetStringField("event_name", event_name);
    record->SetNumberField("event_timestamp", static_cast<double>(current_time));
    record->SetStringField("event_version", "1.0.0");
    record->SetStringField("app_version", "1.0.0");
    record->SetObjectField("event_data", event_data);
    // Add to the Batch of Records
    GenerateBatch(record, event_id);
}

void AMyActor::GenerateBatch(TSharedPtr<FJsonObject> record, FString partitionKey)
{
    // Append Raw Records with new Record
    // Create new JSON object
    TSharedPtr<FJsonObject> wrappedRecord = MakeShareable(new FJsonObject);
    // Set event data fields in JSON object
    wrappedRecord->SetObjectField("event", record);
    wrappedRecord->SetStringField("application_id", GAP_APPLICATION_ID);
    // Add to array of raw records
    raw_records.Add(wrappedRecord);
    // Debug message showing a record added to list with total list size
    UE_LOG(LogTemp, Warning, TEXT("Added record to list: %s"), *FString::FromInt(raw_records.Num()));
    // Once total list size reaches the batch size it will be sent to Kinesis
    if (raw_records.Num() >= BATCH_SIZE)
    {
        // Call Put Record
        PutRecords(raw_records, partitionKey);

        // Clears raw records after they are sent for demo.
        // In production, change to only clear on successful response.
        raw_records.Empty();
    }
}

// Puts a batch of records into Kinesis
void AMyActor::PutRecords(TArray<TSharedPtr<FJsonObject>> records, FString partitionKey)
{
    // Initialize AWS SDK API
    Aws::InitAPI(options);

    // Set AWS Client Configuration
    Aws::Client::ClientConfiguration clientConfig;
    clientConfig.region = AWS_REGION;

    // Initializes Kinesis Client
    kinesisClient = new Aws::Kinesis::KinesisClient(*credentials, clientConfig);

    // Set Kinesis Put Request information
    Aws::Kinesis::Model::PutRecordsRequest putRecordsRequest;
    const Aws::String awsStreamName(TCHAR_TO_UTF8(*KINESIS_STREAM_NAME));
    putRecordsRequest.SetStreamName(awsStreamName);

    // Iterate through batch records and add into Kinesis Put Records Request
    for (int8 i = records.Num() - 1; i >= 0; --i)
    {
        // Sets an AWS Kinesis single Put Request entry, which is encapsulated together into a batch to be sent in a single request
        Aws::Kinesis::Model::PutRecordsRequestEntry putRecordsRequestEntry;

        // Unreal Engine's Json Writer will serialize the data then convert the JSON object into this FString
        FString Result;
        TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Result);
        FJsonSerializer::Serialize(records[i].ToSharedRef(), Writer);

        // Optional Logging to show how the JSON format looks like before sent to Kinesis
        // UE_LOG(LogTemp, Warning, TEXT("RESULT STRING JSON: %s"), *Result);

        // FString is then converted to AWS StringStream for compatibility
        std::string const resultToString = TCHAR_TO_UTF8(*Result);
        Aws::StringStream data;
        data << resultToString;

        // AWS StringStream is converted into byte data to send to Kinesis
        Aws::Utils::ByteBuffer bytes((unsigned char *)resultToString.c_str(), resultToString.length());

        // Sets the single Put Records entry with data and partition key
        putRecordsRequestEntry.WithData(bytes).WithPartitionKey(TCHAR_TO_UTF8(*partitionKey));

        // Pushes the single Put Records entry to a batch of put records
        putRecordsRequest.AddRecords(putRecordsRequestEntry);
    }
    // Optional log showing that data formatting is completed successfully and now sending request to Kinesis
    UE_LOG(LogTemp, Warning, TEXT("Sending request to Kinesis"));
    // Sending an asynchronous request to Kinesis to put the batch of records
    kinesisClient->PutRecordsAsync(putRecordsRequest, std::bind(&AMyActor::OnPutRecordsAsyncOutcomeReceived, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
}

// Delegate called when the request to Kinesis returns a response
void AMyActor::OnPutRecordsAsyncOutcomeReceived(const Aws::Kinesis::KinesisClient *, const Aws::Kinesis::Model::PutRecordsRequest &, const Aws::Kinesis::Model::PutRecordsOutcome &outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext> &)
{
    // Called when the Kinesis Data Stream accepted the data
    if (outcome.IsSuccess())
    {
        UE_LOG(LogTemp, Warning, TEXT("Successfully sent to Kinesis!"));
    }
    // Called when the Kinesis Data Stream request returns an error, along with the message
    else
    {
        Aws::String message = outcome.GetError().GetMessage();
        FString fMessage = UTF8_TO_TCHAR(message.c_str());
        UE_LOG(LogTemp, Warning, TEXT("ERROR: Could not send to Kinesis. Reason: %s"), *fMessage);
    }
    // Shuts down the AWS API now that it is no longer used
    Aws::ShutdownAPI(options);
}

// Helper function to format Unreal Engine's UUID format to a format accepted by GAP
FString AMyActor::FormatUUID(FString uuid)
{
    return (uuid.Mid(0, 8).ToLower() + "-" + uuid.Mid(8, 4).ToLower() + "-" + uuid.Mid(12, 4).ToLower() + "-" + uuid.Mid(16, 4).ToLower() + "-" + uuid.Mid(20, 12).ToLower());
}