I've the following test file that is read using the edit.py and loads into the table. I wanted to skip the header and not show up in the Output layout, right now the header comes in the output layout.
-----Input file
SERIAL|MNO|YR|DESC|AMT|MSRP|DLR|NAME|STATE|DATE|M_CAT|CNTRY|WMON|WMIL|
SADCJ2B|7315 |2017|RAV4 |61,638.96|57,250.00|4495| |PA|20170515|FPACE |CAN|18|8000|
SADCJ2F|C761|2019|CAMRY Premium |56,139.19|57,821.00|5339| |NC|20190531|FPACE |USA|36|1000|
-----Input file
SERIAL|MNO|YR|DESC|AMT|MSRP|DLR|NAME|STATE|DATE|M_CAT|CNTRY|WMON|WMIL|
SADCJ2B|7315 |2017|RAV4 |61,638.96|57,250.00|4495| |PA|20170515|FPACE |CAN|18|8000|
SADCJ2F|C761|2019|CAMRY Premium |56,139.19|57,821.00|5339| |NC|20190531|FPACE |USA|36|1000|
def read_mapping_data():
""""
Read the metadata from json
:return:
"""
global args
global layouts
global layout_fields
global transforms
global transform_fields
global metadata_file_name
with open(metadata_file_name, 'r') as metadata_file:
metadata = json.load(metadata_file)
for layout_name in metadata["layouts"]:
layout = metadata["layouts"][layout_name]
layouts.loc[layout_name] = pd.Series()
df = pd.DataFrame(columns=['name', 'start', 'end', 'length'])
for key in layout:
if key == "fields":
for field in layout['fields']:
df.loc[field['seq']] = pd.Series(
{'name': field['name'], 'start': int(field['start']) if 'start' in field.keys() else "",
'end': int(field['end']) if 'end' in field.keys() else "",
'length': int(field['end']) - int(field['start']) + 1 if 'end' in field.keys() else ""})
df.sort_index(inplace=True)
layout_fields[layout_name] = df
else:
layouts.loc[layout_name][key] = layout[key]
i = 0;
for transform_name in metadata["transforms"]:
mapping = metadata["transforms"][transform_name]
transforms.loc[transform_name] = pd.Series()
df = pd.DataFrame(columns=['in_field_name', 'transform'])
for key in mapping:
if key == "fields":
for field in mapping['fields']:
df.loc[field['out_field_name']] = pd.Series(
{'in_field_name': field['in_field_name'] if 'in_field_name' in field.keys() else "",
'transform': field['transform'] if 'transform' in field.keys() else ""})
transform_fields[transform_name] = df
i += 1
else:
transforms.loc[transform_name][key] = mapping[key]
logger.info("=====================================")
logger.info("Reading Metadata Completed")
logger.info("=====================================")
def validate_arguments():
"""
Parse and validate the arguments passed
:return:
"""
global metadata_file_name
error_messages = []
if not os.path.isfile(metadata_file_name):
error_messages.append(
"Metadata JSON file {} not available".format(metadata_file_name))
if len(error_messages) > 0:
logger.error("=====================================")
logger.error("=========Errors in Arguments=========")
for error_message in error_messages:
logger.error(error_message)
logger.error("=====================================")
return False
return True
def validate_metadata():
"""
Validate metadata for layouts
:return:
"""
global file_names
global layouts
global layout_fields
global transforms
global transform_names
error_messages = []
for transform_name in transform_names:
input_layout = transforms.loc[transform_name]['input']
input_type = layouts.loc[input_layout]['type'].upper()
in_query = "" if pd.isnull(layouts.loc[input_layout]['query']) else layouts.loc[input_layout]['query'].upper()
output_layout = transforms.loc[transform_name]['output']
output_type = layouts.loc[output_layout]['type'].upper()
key_column = "" if pd.isnull(layouts.loc[input_layout]['key_column']) else layouts.loc[input_layout][
'key_column']
transform_type = transforms.loc[transform_name]['type'].upper()
in_file_location = "" if pd.isnull(layouts.loc[input_layout]['location']) else layouts.loc[input_layout][
'location']
out_file_location = "" if pd.isnull(layouts.loc[output_layout]['location']) else layouts.loc[input_layout][
'location']
input_delimiter = "" if pd.isnull(layouts.loc[input_layout]['delimiter']) else layouts.loc[input_layout][
'delimiter'].upper()
output_delimiter = "" if pd.isnull(layouts.loc[output_layout]['delimiter']) else layouts.loc[output_layout][
'delimiter'].upper()
if transform_type != "TRANSPOSE" and transform_type != "MAP":
error_messages.append(
"Unknown transform_type {} for transform {}".format(transform_type, transform_name))
if key_column == "" and transform_type == "TRANSPOSE":
error_messages.append(
"Key_column tag is missing in transform_type {} for transform {}".format(transform_type, transform_name))
if transform_name not in transforms.index:
error_messages.append(
"Transform {} is not available in metadata JSON file {}".format(transform_name, metadata_file_name))
if input_type == 'FILE' or input_type == 'JSON':
if not os.path.isfile(in_file_location + file_names[input_layout]):
error_messages.append(
"Transform {} input File {} not available".format(transform_name, file_names[input_layout]))
if output_type == 'FILE' or output_type == 'JSON':
if os.path.isfile(out_file_location + file_names[output_layout]):
error_messages.append(
"Transform {} output File {} already available".format(transform_name, file_names[output_layout]))
if input_type == 'FILE' and input_delimiter == 'FIXED' and "" in layout_fields[input_layout]['length'].tolist():
error_messages.append(
"Transform {} input type is fixed but some field positions are missing")
if output_type == 'FILE' and output_delimiter == 'FIXED' and "" in layout_fields[output_layout]['length'].tolist():
error_messages.append(
"Transform {} output type is fixed but some field positions are missing")
#
dsn = "stg_dsn" if pd.isnull(layouts.loc[input_layout]['dsn']) else layouts.loc[input_layout]['dsn'].lower()
db_schema = cfg["dsn:{}:schema".format(dsn)]
try:
if input_type == 'TABLE':
df = pd.read_sql("select count(*) cnt from {}.{} where 1=2".format(db_schema,input_layout), con=staging_db_engine)
except:
error_messages.append(
"Transform {} Input Table {} not available".format(transform_name, file_names[input_layout]))
#
try:
if output_type == 'TABLE':
df = pd.read_sql("select count(*) cnt from {}.{} where 1=2".format(db_schema,output_layout), con=staging_db_engine)
except:
error_messages.append(
"Transform {} Output Table {} not available".format(transform_name, file_names[input_layout]))
#
try:
if input_type == 'QUERY':
df = pd.read_sql("select count(*) cnt from ({}) where 1=2".format(in_query), con=staging_db_engine)
except:
error_messages.append(
"Transform {} Input Query {} not executable".format(transform_name, file_names[input_layout]))
#
#
##########################################################
if output_type == 'QUERY':
error_messages.append(
"Transform {} Output type Query not supported".format(transform_name, file_names[input_layout]))
#
# transforms.index.values:
#
# file_location + file_names[input_layout]
if len(error_messages) > 0:
logger.error("=====================================")
logger.error("=========Errors in Metadata==========")
for error_message in error_messages:
logger.error(error_message)
logger.error("=====================================")
return False
return True
def parse_input(l_transform_name):
"""
Read the input and create dataframe
:return:
"""
global file_names
global layouts
global layout_fields
global transforms
global inputs
if l_transform_name == "":
transform_names = transforms.index.values.tolist()
else:
transform_names = [l_transform_name]
for transform_name in transform_names:
try:
input_layout = transforms.loc[transform_name]['input']
input_type = layouts.loc[input_layout]['type'].upper()
logger.info("=====================================")
logger.info("Reading input {} for transform {}".format(transform_name, input_layout))
logger.info("=====================================")
delimiter = "" if pd.isnull(layouts.loc[input_layout]['delimiter']) else layouts.loc[input_layout][
'delimiter'].upper()
header = 0 if pd.isnull(layouts.loc[input_layout]['header']) else int(layouts.loc[input_layout][
'header'])
footer = 0 if pd.isnull(layouts.loc[input_layout]['footer']) else int(layouts.loc[input_layout][
'footer'])
quote_char = '"' if pd.isnull(layouts.loc[input_layout]['quotechar']) else layouts.loc[input_layout][
'quotechar']
file_location = "" if pd.isnull(layouts.loc[input_layout]['location']) else layouts.loc[input_layout][
'location']
query = "" if pd.isnull(layouts.loc[input_layout]['query']) else layouts.loc[input_layout]['query'].upper()
dsn = "stg_dsn" if pd.isnull(layouts.loc[input_layout]['dsn']) else layouts.loc[input_layout]['dsn'].lower()
db_engine = db.get_db_engine(cfg, dsn)
db_schema = cfg["dsn:{}:schema".format(dsn)]
if input_type == 'FILE':
if delimiter == 'TAB':
delim = "\t"
elif delimiter == "":
delim = ","
else:
delim = delimiter
if delimiter == 'FIXED':
df = pd.read_fwf(file_location + file_names[input_layout],
skiprows=header, skipfooter=footer,
colspecs=list(
layout_fields[input_layout][['start', 'end']].itertuples(index=False)),
# widths=layout_fields[input_layout]['length'].tolist(),
names=layout_fields[input_layout]['name'].tolist())
else:
df = pd.read_csv(file_location + file_names[input_layout], delimiter="|",
skiprows=header, skipfooter=footer,escapechar='\\',encoding='UTF-8', quotechar=quote_char,
names=layout_fields[input_layout]['name'].tolist(),dtype=object)
elif input_type == 'JSON':
df = pd.read_json(file_location + file_names[input_layout])
elif input_type == 'TABLE':
df = pd.read_sql_table(input_layout, con=db_engine, schema=db_schema)
elif input_type == 'QUERY':
df = pd.read_sql(query, con=db_engine, schema=db_schema)
else:
logger.warn("Unknown type {} for input {}".format(input_type, input_layout))
if df.empty:
logger.warn("input {} is empty".format(input_layout))
inputs[input_layout] = df
except:
print("Unexpected error:", sys.exc_info()[0])
raise
print("printing df")
logger.info(df)
def process_data(l_transform_name):
"""
Identify the processing action
:return:
"""
if l_transform_name == "":
transform_names = transforms.index.values.tolist()
else:
transform_names = [l_transform_name]
for transform_name in transform_names:
transform_type = transforms.loc[transform_name]['type'].upper()
if transform_type == "TRANSPOSE":
transpose_data(transform_name)
elif transform_type == "MAP":
data_mapping(transform_name)
else:
logger.warn("Unknown transform_type {} for transform {}".format(transform_type, transform_name))
break
def transpose_data(l_transform_name):
"""
Transpose the input fields and map to output layout
:return:
"""
global process_date
global transforms
global inputs
global outputs
input_layout = transforms.loc[l_transform_name]['input']
output_layout = transforms.loc[l_transform_name]['output']
key_column = "" if pd.isnull(layouts.loc[input_layout]['key_column']) else layouts.loc[input_layout]['key_column']
out_df = pd.DataFrame(columns=['attribute', 'value'])
for index in inputs[input_layout].index:
print(index)
df = inputs[input_layout].ix[[index]].T
df = df.reset_index()
df = df.rename(index=str, columns={"index": "attribute", index: "value"})
df['key'] = inputs[input_layout].at[index, key_column]
df['process_date'] = process_date
out_df = out_df.append(df, ignore_index=True)
if output_layout in outputs.keys():
outputs[output_layout] = outputs[output_layout].append(out_df, ignore_index=True)
else:
outputs[output_layout] = out_df
print(out_df)
def data_mapping(l_transform_name):
"""
Read the input and create dataframe
:return:
"""
import re
global process_date
global transforms
global layout_fields
global inputs
global outputs
output_layout = transforms.loc[l_transform_name]['output']
input_layout = transforms.loc[l_transform_name]['input']
trim = "NO" if pd.isnull(transforms.loc[l_transform_name]['trim']) else transforms.loc[l_transform_name][
'trim'].upper()
upper = "NO" if pd.isnull(transforms.loc[l_transform_name]['upper']) else transforms.loc[l_transform_name][
'upper'].upper()
lower = "NO" if pd.isnull(transforms.loc[l_transform_name]['lower']) else transforms.loc[l_transform_name][
'lower'].upper()
out_df = pd.DataFrame(columns=layout_fields[output_layout]['name'].tolist())
for index in out_df.columns:
print(index)
input_field = transform_fields[l_transform_name].loc[index]['in_field_name'] if index in transform_fields[
l_transform_name].index else ""
input_field = "" if pd.isnull(input_field) else input_field
transform = transform_fields[l_transform_name].loc[index]['transform'] if index in transform_fields[
l_transform_name].index else ""
transform = "" if pd.isnull(transform) else transform
if input_field == "":
out_df[index] = ""
elif transform == "":
out_df[index] = inputs[input_layout][input_field]
elif re.match(r'^\[([0-9])*(:)*([0-9])*\]$', transform):
mapping = re.split(r'^\[([0-9])*(:)*([0-9])*\]$', transform)
out_df[index] = inputs[input_layout][input_field].str[int(mapping[1]):int(mapping[3])]
if trim == "YES":
if upper == "YES":
out_df[index] = out_df[index].apply(lambda x: x.upper().strip())
elif lower == "YES":
out_df[index] = out_df[index].apply(lambda x: x.lower().strip())
else:
out_df[index] = out_df[index].apply(lambda x: x.strip())
else:
if upper == "YES":
out_df[index] = out_df[index].apply(lambda x: x.upper())
elif lower == "YES":
out_df[index] = out_df[index].apply(lambda x: x.lower())
if output_layout in outputs.keys():
outputs[output_layout] = outputs[output_layout].append(out_df, ignore_index=False)
else:
outputs[output_layout] = out_df
print("output_layout:")
print(out_df)
def write_output():
"""
Write the data in output layout
:return:
"""
global transforms
global layout_fields
global outputs
global file_names
for output_layout in outputs:
output_type = layouts.loc[output_layout]['type'].upper()
output_fields = layout_fields[output_layout]
output_fields.set_index(output_fields['name'], drop=True, inplace=True)
delimiter = "" if pd.isnull(layouts.loc[output_layout]['delimiter']) else layouts.loc[output_layout][
'delimiter'].upper()
quote_char = '"' if pd.isnull(layouts.loc[output_layout]['quotechar']) else layouts.loc[output_layout][
'quotechar']
file_location = "" if pd.isnull(layouts.loc[output_layout]['location']) else layouts.loc[output_layout][
'location']
dsn = "stg_dsn" if pd.isnull(layouts.loc[output_layout]['dsn']) else layouts.loc[output_layout]['dsn'].lower()
db_schema = cfg["dsn:{}:schema".format(dsn)]
db_engine = db.get_db_engine(cfg, dsn)
try:
if output_type == 'FILE':
if delimiter == 'TAB':
delim = "\t"
elif delimiter == "":
delim = ","
else:
delim = delimiter
if delimiter == 'FIXED':
outputs[output_layout]['temp_fixed_output'] = ""
for column_name in outputs[output_layout].columns.values.tolist():
if column_name != "temp_fixed_output":
print("======={}=======".format(outputs[output_layout][column_name].str.pad(
output_fields.loc[column_name]['length']).str[
:output_fields.loc[column_name]['length']]))
print("before======={}=======".format(outputs[output_layout]['temp_fixed_output']))
outputs[output_layout]['temp_fixed_output'] = outputs[output_layout]['temp_fixed_output'] +\
outputs[output_layout][column_name].str.pad(
output_fields.loc[column_name][
'length']).str[
:output_fields.loc[column_name]['length']]
print("after======={}=======".format(outputs[output_layout]['temp_fixed_output']))
if 'key' in output_fields.index:
if output_fields.loc['key']['length'] < outputs[output_layout].key.map(lambda x: len(x)).max():
logger.error("Key is getting truncated while writing the output {}".format(output_layout))
outputs[output_layout][['temp_fixed_output']].to_csv(file_location + file_names[output_layout],
header=False, index=False,
quoting=csv.QUOTE_NONE, sep='"')
outputs[output_layout].drop('temp_fixed_output', axis=1, inplace=True)
else:
outputs[output_layout].to_csv(file_location + file_names[output_layout], header=False, index=False,
quoting=csv.QUOTE_NONNUMERIC, quotechar=quote_char, sep=delim)
elif output_type == 'JSON':
with open(file_location + file_names[output_layout], "w") as json_file:
json_file.write(outputs[output_layout].to_json(orient='records'))
elif output_type == 'TABLE':
outputs[output_layout].to_sql(name=output_layout, con=db_engine, if_exists='append',
index=False, schema=db_schema,chunksize=5000)
else:
logger.error("{} not implemented yet".format(output_type))
except:
logger.error("Error in writing the output {}".format(output_layout))
raise
# ----------------- PROCESS BEGINS ----------------------
########################################
# Parse arguments
########################################
parser = argparse.ArgumentParser(description='Process data')
parser.add_argument("--metadata_file_name", metavar='Metadata JSON File Name', type=str, required=True,
help='Metadata JSON config file')
parser.add_argument("--file_names", metavar='File Names Dict', type=str, default="{}",
help='File names in Dict format')
parser.add_argument("--transform_names", metavar='Transform Name', type=str, default="[]",
help='Name of transform from metadata json, process all transforms if none specified')
parser.add_argument("--module_name", metavar='Module Name', type=str, default="",
help='Module Name')
args = parser.parse_args()
logger.info("=====================================")
logger.info("Arguments: {}".format(vars(args)))
logger.info("=====================================")
process_date = datetime.now().isoformat()
file_names = ast.literal_eval(args.file_names)
staging_db_engine = db.get_db_engine(cfg, 'stg_dsn')
transform_names = ast.literal_eval(args.transform_names)
metadata_file_name = args.metadata_file_name if os.path.isabs(args.metadata_file_name) \
else os.path.abspath(os.path.expanduser(args.metadata_file_name))
layouts = pd.DataFrame(columns=['type', 'delimiter', 'location', 'key_column', 'query',
'header', 'footer', 'quotechar', 'dsn'])
layout_fields = {}
transforms = pd.DataFrame(columns=['input', 'output', 'type', 'trim', 'upper', 'lower'])
transform_fields = {}
inputs = {}
outputs = {}
########################################
# Get lock
########################################
# Acquire lock to make sure only one instance is running for a given metadata JSON
if misc.get_lock("transform_{}".format(os.path.basename(args.metadata_file_name))) is not True:
logger.error("Failed to get lock. Another instance may be running.")
sys.exit(1)
try:
logger.info("=====================================")
logger.info("Validate the arguments passed")
logger.info("=====================================")
if not validate_arguments():
logger.error("Invalid arguments.")
sys.exit(1)
logger.info("=====================================")
logger.info("Reading data from JSON: {}".format(args.metadata_file_name))
logger.info("=====================================")
read_mapping_data()
logger.info("=====================================")
logger.info("Validate data")
logger.info("=====================================")
if not validate_metadata():
logger.error("Errors in data.")
sys.exit(1)
logger.info("=====================================")
logger.info("Reading all the inputs")
logger.info("=====================================")
if len(transform_names) > 0:
parse_input("")
else:
for transform_name in transform_names:
parse_input(transform_name)
logger.info("=====================================")
logger.info("Apply {} transform".format(transform_names))
logger.info("=====================================")
if len(transform_names) > 0:
process_data("")
else:
for transform_name in transform_names:
process_data(transform_name)
logger.info("=====================================")
logger.info("Write the output")
logger.info("=====================================")
write_output()
#write_to_output_upsert()
finally:
# cleanup
logger.debug("Clean")
