Aug-06-2021, 07:37 PM
Hi everyone.
Iam trying to call connection_string method inside run_oracle_job and Im not sure if its better way to do that.
Im getting this error. TypeError: run_oracle_job() missing 1 required positional argument: 'connection_string'
Someone knows what is missing here?
Im running this code on Apache Airflow and using PythonVirtualenvOperator
Iam trying to call connection_string method inside run_oracle_job and Im not sure if its better way to do that.
Im getting this error. TypeError: run_oracle_job() missing 1 required positional argument: 'connection_string'
Someone knows what is missing here?
Im running this code on Apache Airflow and using PythonVirtualenvOperator
def connection_string(**kwargs):
from airflow.hooks.oracle_hook import OracleHook
connection = OracleHook.get_connection(kwargs['oracle_conn'])
user = connection.login
password = connection.password
host = connection.host
port = connection.port
service_name = connection.extra_dejson.get('service_name', None)
return f'oracle+cx_oracle://{user}:{password}@{host}:{port}/?service_name={service_name}'
def run_oracle_job(connection_string, *args, **kwargs):
where_clause_suffix = textwrap.dedent("""
where table_schema = 'public'
""")
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = f'{tmp_folder}/nodes/'
relationship_files_folder = f'{tmp_folder}/relationships/'
job_config = ConfigFactory.from_dict({
f'extractor.oracle_metadata.{OracleMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
f'extractor.oracle_metadata.{OracleMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}': True,
f'extractor.oracle_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR}': True,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': 'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=OracleMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
return jobwhat is missing here please?
