python - How to pass credentials from a ParDo into a ReadFromJdbc IO Connector - Stack Overflow

admin2025-04-28  2

I have a requirement to securely get database credentials, which I'm able to accomplish using a ParDo. However I'd like to use a ReadFromJdbc IO Connector, and I'm facing a challenge passing in the credentials (username, password) to the connector. I'm new to Apache Beam/Dataflow, and I've experimented with side inputs but something is wrong.

Any help is appreciated!

Here are my ParDos and part of my pipeline definition:

class FetchDBCredentials(beam.DoFn):
def process(self, element):
    # get credentials here
    cred = {
        'username': 'username'
        'password': 'password'
    }
    yield cred

with Pipeline(DataflowRunner(), options=pipeline_options) as p:

    credentials = (
        p
        | 'Create' >> beam.Create([None])
        | 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
    )

    username = beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username']))
    password = beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))


    rows = (
        p
        | 'ReadFromJdbc' >> ReadFromJdbc(
            table_name='test',
            driver_class_name='org.postgresql.Driver',
            jdbc_url=jdbc_url,
            username=username,
            password=password,
            query='''
                SELECT 
                    employee_id, 
                    user_id
                FROM db.test
            '''
        )
    )

I receive the following error: AttributeError: 'AsSingleton' object has no attribute 'encode'

I have a requirement to securely get database credentials, which I'm able to accomplish using a ParDo. However I'd like to use a ReadFromJdbc IO Connector, and I'm facing a challenge passing in the credentials (username, password) to the connector. I'm new to Apache Beam/Dataflow, and I've experimented with side inputs but something is wrong.

Any help is appreciated!

Here are my ParDos and part of my pipeline definition:

class FetchDBCredentials(beam.DoFn):
def process(self, element):
    # get credentials here
    cred = {
        'username': 'username'
        'password': 'password'
    }
    yield cred

with Pipeline(DataflowRunner(), options=pipeline_options) as p:

    credentials = (
        p
        | 'Create' >> beam.Create([None])
        | 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
    )

    username = beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username']))
    password = beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))


    rows = (
        p
        | 'ReadFromJdbc' >> ReadFromJdbc(
            table_name='test',
            driver_class_name='org.postgresql.Driver',
            jdbc_url=jdbc_url,
            username=username,
            password=password,
            query='''
                SELECT 
                    employee_id, 
                    user_id
                FROM db.test
            '''
        )
    )

I receive the following error: AttributeError: 'AsSingleton' object has no attribute 'encode'

Share Improve this question edited Jan 9 at 18:58 Gina Carson asked Jan 9 at 18:54 Gina CarsonGina Carson 112 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

This should be the correct code:

import apache_beam as beam
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

class FetchDBCredentials(beam.DoFn):
    def process(self, element):
        # Replace with your actual logic to fetch credentials securely
        cred = {
            'username': 'your_username',
            'password': 'your_password'
        }
        yield cred

class ReadFromJdbcFn(beam.DoFn):
    def __init__(self, jdbc_url, table_name, driver_class_name, query, username, password):
        self.jdbc_url = jdbc_url
        self.table_name = table_name
        self.driver_class_name = driver_class_name
        self.query = query
        self.username = username
        self.password = password

    def process(self, element):
        rows = (
            element
            | 'ReadDb' >> ReadFromJdbc(
                table_name=self.table_name,
                driver_class_name=self.driver_class_name,
                jdbc_url=self.jdbc_url,
                username=self.username,
                password=self.password,
                query=self.query
            )
        )
        yield rows

def run():
    pipeline_options = PipelineOptions()
    with beam.Pipeline(options=pipeline_options) as p:
        jdbc_url = "jdbc:postgresql://your_host:5432/your_database"

        credentials = (
            p
            | 'Create' >> beam.Create([None])
            | 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
        )

        # Use CombineGlobally(lambda elements: next(iter(elements)), requires_singleton_input=True)
        # to extract the single credential dictionary from the PCollection.
        single_credential = credentials | beam.combiners.ToSingleton()

        rows = (
            p
            | 'Create Placeholder for Jdbc Read' >> beam.Create([None])
            | 'ReadFromJdbc ParDo' >> beam.ParDo(
                ReadFromJdbcFn(
                    jdbc_url=jdbc_url,
                    table_name='test',
                    driver_class_name='org.postgresql.Driver',
                    query='''
                        SELECT
                            employee_id,
                            user_id
                        FROM your_schema.test
                    ''',
                    username=beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username'])),
                    password=beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))
                )
            | 'Flatten' >> beam.FlatMap(lambda x: x) # Flatten the nested PCollection
        )

        rows | 'Log Output' >> beam.Map(print)

if __name__ == '__main__':
    run()

Instead of directly passing AsSingleton objects to ReadFromJdbc, encapsulate the ReadFromJdbc call within another ParDo (ReadFromJdbcFn).ReadFromJdbc expects concrete values for username and password. AsSingleton provides a PCollectionView, which needs to be accessed within a ParDo. Hope it helps!

转载请注明原文地址:http://anycun.com/QandA/1745778987a91173.html