I have a requirement to securely get database credentials, which I'm able to accomplish using a ParDo. However I'd like to use a ReadFromJdbc IO Connector, and I'm facing a challenge passing in the credentials (username, password) to the connector. I'm new to Apache Beam/Dataflow, and I've experimented with side inputs but something is wrong.
Any help is appreciated!
Here are my ParDos and part of my pipeline definition:
class FetchDBCredentials(beam.DoFn):
def process(self, element):
# get credentials here
cred = {
'username': 'username'
'password': 'password'
}
yield cred
with Pipeline(DataflowRunner(), options=pipeline_options) as p:
credentials = (
p
| 'Create' >> beam.Create([None])
| 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
)
username = beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username']))
password = beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))
rows = (
p
| 'ReadFromJdbc' >> ReadFromJdbc(
table_name='test',
driver_class_name='org.postgresql.Driver',
jdbc_url=jdbc_url,
username=username,
password=password,
query='''
SELECT
employee_id,
user_id
FROM db.test
'''
)
)
I receive the following error: AttributeError: 'AsSingleton' object has no attribute 'encode'
I have a requirement to securely get database credentials, which I'm able to accomplish using a ParDo. However I'd like to use a ReadFromJdbc IO Connector, and I'm facing a challenge passing in the credentials (username, password) to the connector. I'm new to Apache Beam/Dataflow, and I've experimented with side inputs but something is wrong.
Any help is appreciated!
Here are my ParDos and part of my pipeline definition:
class FetchDBCredentials(beam.DoFn):
def process(self, element):
# get credentials here
cred = {
'username': 'username'
'password': 'password'
}
yield cred
with Pipeline(DataflowRunner(), options=pipeline_options) as p:
credentials = (
p
| 'Create' >> beam.Create([None])
| 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
)
username = beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username']))
password = beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))
rows = (
p
| 'ReadFromJdbc' >> ReadFromJdbc(
table_name='test',
driver_class_name='org.postgresql.Driver',
jdbc_url=jdbc_url,
username=username,
password=password,
query='''
SELECT
employee_id,
user_id
FROM db.test
'''
)
)
I receive the following error: AttributeError: 'AsSingleton' object has no attribute 'encode'
This should be the correct code:
import apache_beam as beam
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
class FetchDBCredentials(beam.DoFn):
def process(self, element):
# Replace with your actual logic to fetch credentials securely
cred = {
'username': 'your_username',
'password': 'your_password'
}
yield cred
class ReadFromJdbcFn(beam.DoFn):
def __init__(self, jdbc_url, table_name, driver_class_name, query, username, password):
self.jdbc_url = jdbc_url
self.table_name = table_name
self.driver_class_name = driver_class_name
self.query = query
self.username = username
self.password = password
def process(self, element):
rows = (
element
| 'ReadDb' >> ReadFromJdbc(
table_name=self.table_name,
driver_class_name=self.driver_class_name,
jdbc_url=self.jdbc_url,
username=self.username,
password=self.password,
query=self.query
)
)
yield rows
def run():
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
jdbc_url = "jdbc:postgresql://your_host:5432/your_database"
credentials = (
p
| 'Create' >> beam.Create([None])
| 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
)
# Use CombineGlobally(lambda elements: next(iter(elements)), requires_singleton_input=True)
# to extract the single credential dictionary from the PCollection.
single_credential = credentials | beam.combiners.ToSingleton()
rows = (
p
| 'Create Placeholder for Jdbc Read' >> beam.Create([None])
| 'ReadFromJdbc ParDo' >> beam.ParDo(
ReadFromJdbcFn(
jdbc_url=jdbc_url,
table_name='test',
driver_class_name='org.postgresql.Driver',
query='''
SELECT
employee_id,
user_id
FROM your_schema.test
''',
username=beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username'])),
password=beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))
)
| 'Flatten' >> beam.FlatMap(lambda x: x) # Flatten the nested PCollection
)
rows | 'Log Output' >> beam.Map(print)
if __name__ == '__main__':
run()
Instead of directly passing AsSingleton objects to ReadFromJdbc, encapsulate the ReadFromJdbc call within another ParDo (ReadFromJdbcFn).ReadFromJdbc expects concrete values for username and password. AsSingleton provides a PCollectionView, which needs to be accessed within a ParDo. Hope it helps!