[text] Kafka read

Viewer

copydownloadembedprintName: Kafka read
  1. import apache_beam as beam
  2. import logging
  3. import typing
  4. from apache_beam.io.kafka import ReadFromKafka
  5. from apache_beam.io.kafka import WriteToKafka
  6. from apache_beam.options.pipeline_options import PipelineOptions
  7.  
  8. def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
  9.   pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True)
  10.  
  11.   logging.info('Starting data pipeline. bootstrap_servers=%s in_topic=%s out_topic=%s',
  12.       str(bootstrap_servers), in_topic, out_topic)
  13.  
  14.   def fail_func(val):
  15.     raise Exception('Intentional failure at %r', val)
  16.  
  17.   with beam.Pipeline(options=pipeline_options) as pipeline:
  18.     _ = (
  19.         pipeline
  20.         | 'Read from kafka' >> ReadFromKafka(
  21.             consumer_config={
  22.                 'bootstrap.servers': bootstrap_servers,
  23.                 'auto.offset.reset': 'earliest'},
  24.             topics=[in_topic])
  25.         | beam.WindowInto(beam.window.FixedWindows(1))
  26.         | beam.GroupByKey()
  27.         | beam.Map(lambda kv: fail_func(kv)))
  28.  
  29. if __name__ == '__main__':
  30.   logging.getLogger().setLevel(logging.INFO)
  31.   import argparse
  32.  
  33.   parser = argparse.ArgumentParser()
  34.   parser.add_argument(
  35.       '--bootstrap_servers',
  36.       dest='bootstrap_servers',
  37.       required=True,
  38.       help='Bootstrap servers for the Kafka cluster')
  39.   parser.add_argument(
  40.       '--in_topic',
  41.       dest='in_topic',
  42.       required=True,
  43.       help='Kafka topic to read data from')
  44.   parser.add_argument(
  45.       '--out_topic',
  46.       dest='out_topic',
  47.       required=True,
  48.       help='Kafka topic to write data to')
  49.   known_args, pipeline_args = parser.parse_known_args()
  50.  
  51.   run(known_args.bootstrap_servers, known_args.in_topic, known_args.out_topic, pipeline_args)

Editor

You can edit this paste and save as new:


File Description
  • Kafka read
  • Paste Code
  • 16 Mar-2021
  • 1.73 Kb
You can Share it: