Bookmarking in Amorphic

As many etl jobs can be interdependent on different datasets, it is required by us to maintain the state at some persistent location for later access and synchronise the jobs. In this package, we have an util to store the state into amorphic with s3 as the backend. The data is stored as dictionary into s3 with key, value structure. Value can be dict or list or any primitive type.

The state is stored based on job name and thus one need to be consistent with the job_name for respective state data.

Usage

  1. Pre-requisite
    • Create dataset backed by S3 to store the state data.

  2. Initialize the state management class and get StateData instance

from amorphicutils import statemanagement
state_mgm = statemanagement.StateStore(lz_bucket, dlz_bucket, state_domain, state_dataset)
state_data = state_mgm.get_or_create(test_job_name)

test_job_name is the name of the job for which the state is.

  1. Add new element to state data.

state_data.add_element(test_statedata_key_1, test_statedata_value_1)

We add new key and value to existing StateData object with add_element method. The key should be unique and thus <domain name>.<dataset name> is good selection for key.

  1. Save the state data in persistent storage

state_mgm.update(state_data)

update method store the data in s3 with suffix as _running.json, which indicates that the jobs related to respective state store is not complete and just stored in persistent location.

  1. Complete the state data

state_mgm.complete(state_data)

This will store the final copy of state_data into persistent storage with suffix as _completed.json

Implementation

State Data

This is the object which stores the data in key-value format.

class amorphicutils.tools.statemanagement.StateData

Class to store the state data. It stores the data as dictionary.

__init__()

Initialize the StateData class

>>> state_data = StateData()
add_element(key, value)

Add element to the state data

Parameters
  • key – key of the state data

  • value – value of the state data

Returns

>>> state_data = StateData()
>>> state_data.add_element("DatasetId", {"epoch_time": 156234987})
from_bytes(data)

Load the dict of data into state data class

Parameters

data – dict of data to be loaded

Returns

>>> state_data = StateData()
>>> state_data.from_bytes({ "DatasetId": { "epoch_time": 156234678}})
remove_element(key)

Removes the element from state data

Parameters

key – key of the state data

Returns

>>> state_data = StateData()
>>> state_data.remove_element("DatasetId")
reset()

Resets the data of StateData class :return:

to_bytes()

Returns the state data in bytes format

Returns

bytes of state data

>>> state_data.to_bytes()

State Store

This object interacts with Amorphic to update the dataset.

class amorphicutils.tools.statemanagement.StateStore(lz_bucket_name, dlz_bucket_name, state_domain, state_dataset_name, username, file_type='others')

Store the state of the amorphic etl jobs

__init__(lz_bucket_name, dlz_bucket_name, state_domain, state_dataset_name, username, file_type='others')

Initialize the StateStore class

Parameters
  • lz_bucket_name – bucket name of lz bucket

  • dlz_bucket_name – bucket name of dlz bucket

  • state_domain – domain of the state dataset

  • state_dataset_name – dataset name of the state dataset

  • username – user id of the user who has access to state data

  • file_type – file type of the state dataset. Default: “others”

>>> state_store = StateStore("lz_bucket", "dlz_bucket", "test_state_domian",
                             "test_state_dataset", "test_user", "others")
complete(data)

Saves the file with completed tag

Parameters

data – StateData which will store data into s3

Returns

>>> state_store = StateStore("lz_bucket", "dlz_bucket", "test_state_domian",
                             "test_state_dataset", "test_user", "others")
>>> state_data = state_store.get_or_create("test_job")
>>> state_data.add_element("DatasetId", {"epoch_time": 156234987})
>>> state_store.complete(state_data)
get_or_create(job_name)

Gets or create StateData instance

Parameters

job_name – name of the job name for which to maintain state

Returns

instance of StateData

>>> state_store = StateStore("lz_bucket", "dlz_bucket", "test_state_domian",
                             "test_state_dataset", "test_user", "others")
>>> state_data = state_store.get_or_create("test_job")
update(data)

Saves the file with running tag in s3

Parameters

data – StateData which will store data into s3

Returns

>>> state_store = StateStore("lz_bucket", "dlz_bucket", "test_state_domian",
                             "test_state_dataset", "test_user", "others")
>>> state_data = state_store.get_or_create("test_job")
>>> state_data.add_element("DatasetId", {"epoch_time": 156234987})
>>> state_store.update(state_data)