- import apache_beam as beam
- import logging
- import typing
- from apache_beam.io.kafka import ReadFromKafka
- from apache_beam.io.kafka import WriteToKafka
- from apache_beam.options.pipeline_options import PipelineOptions
- def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
- pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True)
- logging.info('Starting data pipeline. bootstrap_servers=%s in_topic=%s out_topic=%s',
- str(bootstrap_servers), in_topic, out_topic)
- def fail_func(val):
- raise Exception('Intentional failure at %r', val)
- with beam.Pipeline(options=pipeline_options) as pipeline:
- _ = (
- pipeline
- | 'Read from kafka' >> ReadFromKafka(
- consumer_config={
- 'bootstrap.servers': bootstrap_servers,
- 'auto.offset.reset': 'earliest'},
- topics=[in_topic])
- | beam.WindowInto(beam.window.FixedWindows(1))
- | beam.GroupByKey()
- | beam.Map(lambda kv: fail_func(kv)))
- if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- import argparse
- parser = argparse.ArgumentParser()
- parser.add_argument(
- '--bootstrap_servers',
- dest='bootstrap_servers',
- required=True,
- help='Bootstrap servers for the Kafka cluster')
- parser.add_argument(
- '--in_topic',
- dest='in_topic',
- required=True,
- help='Kafka topic to read data from')
- parser.add_argument(
- '--out_topic',
- dest='out_topic',
- required=True,
- help='Kafka topic to write data to')
- known_args, pipeline_args = parser.parse_known_args()
- run(known_args.bootstrap_servers, known_args.in_topic, known_args.out_topic, pipeline_args)
[text] Kafka read
Viewer
*** This page was generated with the meta tag "noindex, nofollow". This happened because you selected this option before saving or the system detected it as spam. This means that this page will never get into the search engines and the search bot will not crawl it. There is nothing to worry about, you can still share it with anyone.
Editor
You can edit this paste and save as new: