Reading data from Amorphic Datalake

Amorphic Datalake stores data in many formats which can be backed by Relational Database (Redshift/Aurora only for stuctured data) or object storage s3. However, the platform uses a structure to store the data for better organization.

With the Read class you can orchestrate the reading of data in more elegant way. You can either use python or pyspark as backend processing.

Reading in python-shell

The following class will return the pandas dataframe of the data.

Reading from S3

class amorphicutils.python.read.Read(bucket_name, region=None)

Class to read data from Amorphic

__init__(bucket_name, region=None)

Initialize the class with dataset specific details

Parameters

bucket_name – name of the bucket

>>> reader = Read("dlz_bucket")
list_object(domain_name, dataset_name)

List the objects for specific datasets

Parameters
  • domain_name – domain name of the dataset

  • dataset_name – dataset name

Returns

list of objects from s3

>>> reader = Read("dlz_bucket")
>>> reader.list_object("testdomain", "testdataset")
read_csv_data(domain_name, dataset_name, schema=None, header=False, delimiter=',', upload_date=None, path=None, **kwargs)

Read csv data from s3 using pandas dataframe read api and generate pandas dataframe

Parameters
  • domain_name – domain name of the dataset

  • dataset_name – dataset name

  • schema – List of col names of the data.

  • header – True if data files contains header. Default: False

  • delimiter – delimiter in the dataset. Default: “,”

  • upload_date – upload date timestamp.

  • path – Path of the file to read from.

  • kwargs – Optional arguments avaiable for python pandas csv read

Type

list(str)

Returns

>>> reader = Read("dlz_bucket")
>>> df = reader.read_csv_data("testdomain", "testdataset", upload_date="1578305347")
read_excel(domain_name, dataset_name, sheet_name=0, header=False, schema=None, upload_date=None, path=None, **kwargs)

Read data from excel files and return pandas dataframe

Parameters
  • domain_name – domain name of the dataset

  • dataset_name – dataset name

  • sheet_name – sheet name or indices to read data from. Default: 0

  • header – True if data files contains header. Default: False

  • schema – List of col names of the data.

  • upload_date – upload date timestamp.

  • path – Path of the file to read from.

  • kwargs – Optional arguments avaiable for python pandas excel read

Returns

>>> amorphic_reader = Read(bucket_name="dlz_bucket")
>>> result = amorphic_reader.read_excel(domain_name="testdomain", dataset_name="testdataset", header=True)
read_json(domain_name, dataset_name, upload_date=None, path=None, **kwargs)

Read data from excel files and return pandas dataframe

Parameters
  • domain_name – domain name of the dataset

  • dataset_name – dataset name

  • upload_date – upload date timestamp.

  • path – Path of the file to read from.

  • kwargs – Optional arguments avaiable for python pandas json read

Returns

>>> amorphic_reader = Read(bucket_name="dlz_bucket")
>>> result = amorphic_reader.read_json(domain_name="testdomain", dataset_name="testdataset")

Reading in pyspark

The following class will return spark dataframe of the data

Reading from S3

class amorphicutils.pyspark.read.Read(bucket_name, spark, region=None)

Class to read data from Amorphic

__init__(bucket_name, spark, region=None)

Initialize the class with dataset specific details

Parameters
  • bucket_name – name of the bucket (dlz)

  • spark – SparkContext

>>> reader = Read("dlz_bucket", spark_context)
list_object(domain_name, dataset_name)

List the objects for specific datasets

Parameters
  • domain_name – domain name of the dataset

  • dataset_name – dataset name

Returns

list of objects from s3

>>> reader = Read("dlz_bucket", spark=spark_context)
>>> reader.list_object("testdomain", "testdataset")
read_csv_data(domain_name, dataset_name, schema=None, header=False, delimiter=',', multline=True, upload_date=None, path=None, **kwargs)

Read csv data from s3 using spark read api and generate spark dataframe

Parameters
  • domain_name – domain name of the dataset

  • dataset_name – dataset name

  • schema – schema of the data.

  • header – True if data files contains header. Default: False

  • delimiter – delimiter in the dataset. Default: “,”

  • multline – Read data which may span multiple lines for column. Default: True

  • upload_date – upload date timestamp. Ignored if full_load is True

  • path – Path of the file Optional: If given then implicit prefix creation is ignored

  • kwargs – Optional arguments avaiable for pyspark read

Type

StructType([StructField()])

Returns

spark dataframe of data from dataset

>>> reader = Read("dlz_bucket", spark=spark_context)
>>> df = reader.read_csv_data("testdomain", "testdataset")
read_json(domain_name, dataset_name, upload_date=None, path=None, **json_kwargs)

Read json data from s3 using spark read api and generate spark dataframe

Parameters
  • domain_name – domain name of the dataset

  • dataset_name – dataset name

  • upload_date – upload date timestamp. Ignored if full_load is True

  • path – Path of the file Optional: If given then implicit prefix creation is ignored

  • json_kwargs – Optional arguments avaiable for pyspark parquet read

Returns

spark dataframe of data from dataset

>>> reader = Read("dlz_bucket", spark=spark_context)
>>> df = reader.read_json("testdomain", "testdataset")
read_parquet(domain_name, dataset_name, upload_date=None, path=None, **parquet_kwargs)

Read parquet data from s3 using spark read api and generate spark dataframe

Parameters
  • domain_name – domain name of the dataset

  • dataset_name – dataset name

  • upload_date – upload date timestamp. Ignored if full_load is True

  • path – Path of the file Optional: If given then implicit prefix creation is ignored

  • parquet_kwargs – Optional arguments avaiable for pyspark parquet read

Returns

spark dataframe of data from dataset

>>> reader = Read("dlz_bucket", spark=spark_context)
>>> df = reader.read_parquet("testdomain", "testdataset")

Reading from Data Warehouse

class amorphicutils.pyspark.read.DwhRead(dwh_type, dwh_host, dwh_port, dwh_db, dwh_user, dwh_pass, tmp_dir)

Class to read data from Datawarehouse(Redshift/Aurora)

__init__(dwh_type, dwh_host, dwh_port, dwh_db, dwh_user, dwh_pass, tmp_dir)

Initialize class with required parameters for connecting to data warehouse.

Parameters
  • dwh_type – Is it “redshift” or “aurora”

  • dwh_host – Hostname for DWH

  • dwh_port – Port for DWH

  • dwh_db – Database name to connect. ex. cdap

  • dwh_user – Username to use for connection

  • dwh_pass – Password for the user

  • tmp_dir – Temp directory for store intermediate result

read_from_redshift(glue_context, domain_name, dataset_name, **kwargs)

Return response with data from Redshift

Parameters
  • glue_context – GlueContext

  • domain_name – Domain name of dataset

  • dataset_name – Dataset name

  • kwargs – Extra params like: hashfield

Returns

>>> dwh_reader = DwhRead("redshift", DWH_HOST, DWH_PORT, DWH_DB, dwh_user, dwh_pass, tmp_dir)
>>> response = dwh_reader.read_from_redshift(glue_context, domain_name, dataset_name)