May-18-2020, 08:27 AM
When I try to use the below code to load data from S3 bucket into AWS ES, I am able to get request post status code as 201.
from __future__ import print_function
import boto3
import datetime
import urllib
import urllib3
import logging
from pprint import pprint
import csv
import io
import json
from collections import defaultdict
from requests_aws4auth import AWS4Auth
import requests
globalVars = {}
globalVars['Owner'] = "cyberdemo"
globalVars['Environment'] = "Prod"
globalVars['awsRegion'] = "us-east-2"
globalVars['tagName'] = "serverless-s3-to-es-log-ingester"
globalVars['service'] = "es"
globalVars['esIndexPrefix'] = "es-logs-"
globalVars['esIndexDocType'] = "es_docs"
globalVars['esHosts'] = {
'test': '' ,
'prod': 'https://search-cyberlabs-xxxxxxxxxxx.us-east-2.es.amazonaws.com'
}
s3 = boto3.client('s3')
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth( credentials.access_key,
credentials.secret_key,
globalVars['awsRegion'],
globalVars['service'],
session_token=credentials.token
)
obj = s3.get_object(Bucket = 'bucket-name', Key = 'logs/conn_log.csv')
lines = obj['Body'].read().decode("utf-8").replace("'", '"')
indexName = globalVars['esIndexPrefix'] + str( datetime.date.today().year ) + '-' + str( datetime.date.today().month )
es_Url = globalVars['esHosts'].get('prod') + '/' + indexName + '/' + globalVars['esIndexDocType']
lines = lines.splitlines()
if (isinstance(lines, str)):
lines = [lines]
docData = {}
docData['objectKey'] = str(key)
docData['createdDate'] = str(obj['LastModified'])
docData['content_type'] = str(obj['ContentType'])
docData['content_length'] = str(obj['ContentLength'])
for line in lines:
docData['content'] = str(line)
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
resp = requests.post(es_Url, auth=awsauth, headers=headers, json=docData)
print(resp)Output:<Response [201]>Upon printing docData, I get the following output. As you can see 'content' include the single row of data.Output:{'objectKey': 'logs/conn_log.csv', 'createdDate': '2020-05-18 06:16:59+00:00', 'content_type': 'text/csv', 'content_length': '3888', 'content': '\ufeffts,uid,id.orig_h,id.orig_p,id.resp_h,id.resp_p,proto,service,duration,orig_bytes,resp_bytes,conn_state,local_orig,missed_bytes,history,orig_pkts,orig_ip_bytes,resp_pkts,resp_ip_bytes'}
{'objectKey': 'logs/conn_log.csv', 'createdDate': '2020-05-18 06:16:59+00:00', 'content_type': 'text/csv', 'content_length': '3888', 'content': '16/03/2012 20:30:00,CCUIP21wTjqkj8ZqX5,192.168.202.79,27/02/2038 00:00:00,192.168.229.251,20/03/1900 00:00:00,tcp,-,-,-,-,SH,-,00/01/1900 00:00:00,Fa,1/1/1900 0:00,21/02/1900 00:00:00,1/1/1900 0:00,21/02/1900 00:00:00'}Instead, I want to split the contents as shown belowOutput:{'\ufeffts': '16/03/2012 20:30:00', 'uid': 'Cozi0S1MAcO3HMgufa', 'id.orig_h': '192.168.202.79', 'id.orig_p': '17/04/2026 00:00:00', 'id.resp_h': '192.168.229.254', 'id.resp_p': '18/03/1901 00:00:00', 'proto': 'tcp', 'service': 'ssl', 'duration': '00/01/1900 00:14:24', 'orig_bytes': '1/7/1901 0:00', 'resp_bytes': '25/11/1902 00:00:00', 'conn_state': 'SF', 'local_orig': '-', 'missed_bytes': '00/01/1900 00:00:00', 'history': 'ShADadfFr', 'orig_pkts': '8/1/1900 0:00', 'orig_ip_bytes': '29/08/1902 00:00:00', 'resp_pkts': '13/01/1900 00:00:00', 'resp_ip_bytes': '9/10/1904 0:00', 'objectKey': 'logs/conn_log.csv', 'createdDate': '2020-05-18 06:16:59+00:00', 'content_type': 'text/csv', 'content_length': '3888'}Therefore I modified the above code from splitting line portion . Upon posting the request using the below code, I get status code as 400.buf = io.StringIO(lines)
reader = csv.DictReader(buf)
for line in reader:
line['objectKey'] = str(key)
line['createdDate'] = str(obj['LastModified'])
line['content_type'] = str(obj['ContentType'])
line['content_length'] = str(obj['ContentLength'])
Data = json.dumps(line)
docData = Data.replace('"', "'")
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
resp = requests.post(es_Url, auth=awsauth, headers=headers, json=docData)
print(resp)Output:<Response [400]>I need to get status code as 201 for the modified code. Can anyone help?. Thanks
