Getting Started

To start the development on Amorphic we need entry point based on backend which can be python or pyspark.

Python

Following is the example sample main file which can be used as reference for app entry point.

import sys
import argparse
import logging

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def main(app_name=None):
    try:
        logger.info("App Name: {0}".format(app_name))

        ####################################
        ############ Your Logic ############
        ####################################

    except Exception as ex:
        logger.error("Failed to execute main with error: {0}".format(str(ex)))


def parse_arguments():
    ap = argparse.ArgumentParser()

    # Add arguments to parser
    ap.add_argument("--scriptLocation", help="Script location in aws glue, this is internal arguments given to glue execution")
    ap.add_argument("--app_name", help="Name of the app when launched locally")
    known_arguments, unknown_arguments = ap.parse_known_args()
    arguments = vars(known_arguments)
    if not arguments.get("scriptLocation"):
        if not arguments.get("app_name"):
            logger.error("--app_name is required as arguments when running locally.")
            sys.exit(1)
    return arguments


if __name__ == '__main__':
    args = parse_arguments()
    if not args.get('app_name'):
        args['app_name'] = args['scriptLocation'].split('/')[-1]
    print(args)

    main(args.get('app_name'))

Pyspark

One can execute spark job locally or in aws glue environment. When one uses spark in aws glue then we use GlueContext. Following can be used as reference entry point.

from amorphicutils.pyspark.infra.spark import get_spark
from amorphicutils.pyspark.infra.gluespark import GlueSpark
from amorphicutils.amorphiclogging import Log4j
import sys
import argparse


def main(app_master=None, app_name=None):
    try:
        if not app_master:
            glue_spark = GlueSpark()
            spark = glue_spark.get_spark()
            logger = glue_spark.get_logger()
            logger.info("Initialized Glue Context")
        else:
            spark = get_spark(app_name)
            spark_logger = Log4j(spark)
            spark_logger.set_level("INFO")
            logger = spark_logger.get_logger()
            logger.info("Initialized Local Spark Context")
        logger.info("Spark App Name: {0}".format(spark.conf.get("spark.app.name")))

        ####################################
        ############ Your Logic ############
        ####################################

    except Exception as ex:
        logger.error("Failed to execute main with error: {0}".format(str(ex)))
        raise Exception(ex)
    finally:
        if not app_master:
            glue_spark.commit_job()


def parse_arguments():
    ap = argparse.ArgumentParser()

    # Add arguments to parser
    ap.add_argument("--master", help="User local[*] or local[<required cores>] to execute locally")
    ap.add_argument("--app_name", help="Name of the app when launched locally")
    known_arguments, unknown_arguments = ap.parse_known_args()
    arguments = vars(known_arguments)
    if arguments:
        if arguments.get("master") and "local" in arguments.get("master") and not arguments.get("app_name"):
            print("--app_name argument is required when running locally")
            sys.exit(1)
    return arguments


if __name__ == '__main__':
    args = parse_arguments()
    print(args)

    main(args.get("master"), args.get("app_name"))