Writing data to Amorphic Datalake

Amorphic platform have concept of Landing Zone and Data Landing Zone, we can only write data onto Landing Zone. If we need to ingest data into Amorphic, we need to write data into proper location of landing zone. With using following class we can write to Amorphic.

Writing in python-shell

It writes pandas dataframe to amorphic landing zone.

class amorphicutils.python.write.Write(bucket_name, region=None)

Class to write data from Amorphic

__init__(bucket_name, region=None)

Initialize the Write object

Parameters

bucket_name – bucket name

>>> writer = Write("lz_bucket")
write_bytes_data(data_bytes, domain_name, dataset_name, user, file_type, upload_date=None, full_reload=False, path=None, file_name=None)

Writes bytes data to datalake

Parameters
  • data_bytes – data bytes to write

  • domain_name – domain name for dataset

  • dataset_name – dataset name

  • user – username with write access to dataset

  • file_type – file type of the dataset

  • upload_date – Current timestamp from time.time(). If not supplied, then cuurent timestamp is used

  • full_reload – True if the table type is of reload type, Default: False

  • path – (Optional)Path where data is stored. Implicit creation of path will be ignored

  • file_name – Name of the file

Returns

write_csv_data(data, domain_name, dataset_name, user, header=True, file_type='csv', quote=True, delimiter=',', upload_date=None, full_reload=False, path=None, file_name=None, **kwargs)

Write data to lz bucket

Parameters
  • data – pandas dataframe of data

  • domain_name – domain name for dataset

  • dataset_name – dataset name

  • user – username with write access to dataset

  • header – True if you want to save file with header. Default: True

  • file_type – file type for dataset

  • quote – True if you want to save you data with quoted character. Default: True

  • delimiter – Delimiter to use to save to s3. Default: ,

  • upload_date – Current timestamp from time.time(). If not supplied, then cuurent timestamp is used

  • full_reload – True if the table type is of reload type, Default: False

  • path – (Optional)Path where data is stored. Implicit creation of path will be ignored

  • file_name – Name of the file

  • kwargs – Optional arguments avaiable for pyspark read

Returns

>>> writer = Write("lz_bucket")
>>> response = writer.write_csv_data(pandas_df, "testdomain", "testdataset", user="userid", file_type="csv")
>>> print(response)
>>> {
  "exitcode": 0,
  "message": "This is success message"
  }

Writing in pyspark

It writes spark dataframe to amorphic landing zone.

class amorphicutils.pyspark.write.Write(bucket_name, spark, region=None)

Class to write data from Amorphic

__init__(bucket_name, spark, region=None)

Initialize the Write object

Parameters
  • bucket_name – Bucket name

  • spark – SparkContext

>>> writer = Write("lz_bucket", spark_object)
write_csv_data(data, domain_name, dataset_name, user, header=False, file_type='csv', quote=True, delimiter=',', upload_date=None, full_reload=False, path=None, reload_wait=30, **kwargs)

Write data to lz bucket

Parameters
  • data – spark dataframe of data

  • domain_name – domain name for dataset

  • dataset_name – dataset name

  • user – username with write access to dataset

  • header – True if you want to save file with header. Default: False

  • file_type – file type for dataset

  • quote – True if you want to save you data with quoted character. Default: True

  • delimiter – The delimiter to use while storing file, Default: ,

  • upload_date – Current timestamp from time.time(). If not supplied, then cuurent timestamp is used

  • full_reload – True if the table type is of reload type, Default: False

  • path – (Optional)Path where data is stored. Implicit creation of path will be ignored

  • reload_wait – Time to wait before trigger reload is seconds. Default: 30

  • kwargs – Optional arguments avaiable for pyspark write

Returns

dict with exitcode and message

>>> writer = Write("lz_bucket")
>>> response = writer.write_csv_data(spark_df, "testdomain", "testdataset", user="userid", file_type="csv")
>>> print(response)
>>> {
  "exitcode": 0,
  "message": "This is success message"
  }
write_json(data, domain_name, dataset_name, user, upload_date=None, file_type='others', full_reload=False, path=None, reload_wait=30, **json_kwargs)

Write data to lz bucket

Parameters
  • data – spark dataframe of data

  • domain_name – domain name for dataset

  • dataset_name – dataset name

  • user – username with write access to dataset

  • upload_date – Current timestamp from time.time(). If not supplied, then cuurent timestamp is used

  • file_type – file type for dataset, Default: others

  • full_reload – True if the table type is of reload type, Default: False

  • path – (Optional)Path where data is stored. Implicit creation of path will be ignored

  • json_kwargs – Optional arguments avaiable for pyspark write

  • reload_wait – Time to wait before trigger reload is seconds. Default: 30

Returns

dict with exitcode and message

>>> writer = Write("lz_bucket")
>>> response = writer.write_json(spark_df, "testdomain", "testdataset", user="userid")
>>> print(response)
>>> {
  "exitcode": 0,
  "message": "This is success message"
  }
write_parquet(data, domain_name, dataset_name, user, upload_date=None, file_type='parquet', full_reload=False, path=None, reload_wait=30, **kwargs)

Write data to lz bucket

Parameters
  • data – spark dataframe of data

  • domain_name – domain name for dataset

  • dataset_name – dataset name

  • user – username with write access to dataset

  • upload_date – Current timestamp from time.time(). If not supplied, then cuurent timestamp is used

  • file_type – file type for dataset, Default: parquet

  • full_reload – True if the table type is of reload type, Default: False

  • path – (Optional)Path where data is stored. Implicit creation of path will be ignored

  • reload_wait – Time to wait before trigger reload is seconds. Default: 30

  • kwargs – Optional arguments avaiable for pyspark write

Returns

dict with exitcode and message

>>> writer = Write("lz_bucket")
>>> response = writer.write_parquet(spark_df, "testdomain", "testdataset", user="userid")
>>> print(response)
>>> {
  "exitcode": 0,
  "message": "This is success message"
  }