Datalake Util

Datalake util has functionality to get the state of the platform.

Dataload success check

Amorphic has multi level of data flow to ensure that data written to a dataset is of expected schema. The flow is, user writes the data to Landing Zone bucket(lz bucket) of the respective data. Then it perform load to datawarehouse (if dataset is stored in dataset) into temp table of dataset to validate the schema. On success it is copied to Data Landing Zone bucket(dlz bucket), which is then copied to final table of data warehouse. So we can just check if data is copied to dlz bucket and on success we can be sure that data will be successfully copied to final target.

Usage

To check if data is successfully copied

  • Initialize DataLandingZoneUtil

from amorphicutils import datalakeutil
datalake_util = datalakeutil.DataLandingZoneUtil(lz_bucket, dlz_bucket)
  • Check if the dataload is success

datalake_util.is_data_load_complete(domain, dataset, upload_time)
  • User bool_retry decorator to check the status with retry

@bool_retry(3, 2, True)
def check_if_data_load_complete(domain, dataset, upload_time):
    return datalake_util.is_data_load_complete(domain, dataset, upload_time)

result = check_if_data_load_complete(self.domain, self.dataset, self.upload_time)

Above bool_retry accepts retry_count, sleep_time in seconds and what outcome your function accepts True or False. In above example it will retry at max for 3 times with delay of 2 sec between every check and it is success if check_if_data_load_complete returns True

  • Get last upload_date epoch folder

datalake_util = datalakeutil.DataLandingZoneUtil(<lz_bucket>, <dlz_bucket>)
last_epoch = datalake_util.get_last_upload_date_folder_epoch(<domain_name>, <dataset_name>)
  • Get list of epoch folders

datalake_util = datalakeutil.DataLandingZoneUtil(<lz_bucket>, <dlz_bucket>)
last_epoch = datalake_util.get_epochs_list(<domain_name>, <dataset_name>)
  • Get new epoch folder

datalake_util = datalakeutil.DataLandingZoneUtil(<lz_bucket>, <dlz_bucket>)
new_epochs_list = datalake_util.get_new_epochs(<domain_name>, <dataset_name>, <upload_time>)
  • Get new files greater than a upload_date epoch

datalake_util = datalakeutil.DataLandingZoneUtil(<lz_bucket>, <dlz_bucket>)
last_epoch = datalake_util.get_new_files_prefix(<domain_name>, <dataset_name>, last_processed_epoch=<upload_time>)

Implementation

class amorphicutils.datalakeutil.DataLandingZoneUtil(lz_bucket_name, dlz_bucket_name, region=None)

Util class to get state of DLZ

__init__(lz_bucket_name, dlz_bucket_name, region=None)

Initialize DLZ Util

Parameters
  • lz_bucket_name – Lz bucket name

  • dlz_bucket_name – Dlz bucket name

get_epochs_list(domain_name, dataset_name)

Returns the list of epochs

Parameters
  • domain_name – domain name of dataset

  • dataset_name – dataset name

Returns

get_last_upload_date_folder_epoch(domain_name, dataset_name)

Get the epoch of folder when there was last upload

Parameters
  • domain_name – domain name of dataset

  • dataset_name – dataset name

Returns

get_new_epochs(domain_name, dataset_name, last_processed_epoch=None)

Returns the list of epochs which are greater than last_processed_epoch

Parameters
  • domain_name – domain name of dataset

  • dataset_name – dataset name

  • last_processed_epoch – last epoch processed

Returns

list of new epochs

get_new_files_prefix(domain_name, dataset_name, last_processed_epoch=None)

Returns the list of file location whose epoch is greater than last_processed_epoch.

Parameters
  • domain_name – domain name of dataset

  • dataset_name – dataset name

  • last_processed_epoch – last epoch processed

Returns

list of new file location

class amorphicutils.datalandingzonestate.DataLandingZoneState(lz_bucket_name, dlz_bucket_name)

Class to check state of DataLandingZone

__init__(lz_bucket_name, dlz_bucket_name)

Initialize the DataLandingZoneState object :param lz_bucket_name: Lz bucket name :param dlz_bucket_name: Dlz bucket name

is_data_load_complete(domain_name, dataset_name, upload_date, region=None)

Return True if data load is complete

Parameters
  • domain_name – domain name

  • dataset_name – dataset name

  • upload_date – upload date to check if exists

  • region – region of aws

Returns

True or False

static is_epoch_dir_exists(bucket_name, domain_name, dataset_name, upload_date, path=None, region=None)

Returns true if the dir with upload date exists

Parameters
  • bucket_name – name of the bucket

  • domain_name – domain name

  • dataset_name – dataset name

  • upload_date – upload date to check if exists

  • path – path for which to check dir

  • region – region of aws

Returns

True or False