Kafka as a Platform: the Ecosystem from the Ground Up Robin Moffatt | #GOTOpia | @rmoff
A presentation at GOTOpia in September 2020 in by Robin Moffatt
 
                Kafka as a Platform: the Ecosystem from the Ground Up Robin Moffatt | #GOTOpia | @rmoff
 
                $ whoami • Robin Moffatt (@rmoff) • Senior Developer Advocate at Confluent (Apache Kafka, not Wikis !) • Working in data & analytics since 2001 • Oracle ACE Director (Alumnus) http://rmoff.dev/talks · http://rmoff.dev/blog · http://rmoff.dev/youtube @rmoff | #GOTOpia | @confluentinc
 
                EVENTS @rmoff | #GOTOpia | @confluentinc
 
                EVENTS @rmoff | #GOTOpia | @confluentinc
 
                • • EVENTS d e n e p p a h g n i h t e Som d e n e p p a h t a Wh
 
                Human generated events A Sale A Stock movement @rmoff | #GOTOpia | @confluentinc
 
                Machine generated events IoT Networking Applications @rmoff | #GOTOpia | @confluentinc
 
                EVENTS are EVERYWHERE @rmoff | #GOTOpia | @confluentinc
 
                EVENTS y r e v ^ are POWERFUL @rmoff | #GOTOpia | @confluentinc
 
                 
                 
                K V
 
                LOG @rmoff | #GOTOpia | @confluentinc
 
                K V
 
                K V
 
                K V
 
                K V
 
                K V
 
                K V
 
                K V
 
                Immutable Event Log Old New Events are added at the end of the log @rmoff | #GOTOpia | @confluentinc
 
                TOPICS @rmoff | #GOTOpia | @confluentinc
 
                Topics Clicks Orders Customers Topics are similar in concept to tables in a database @rmoff | #GOTOpia | @confluentinc
 
                PARTITIONS @rmoff | #GOTOpia | @confluentinc
 
                Partitions Clicks p0 P1 P2 Messages are guaranteed to be strictly ordered within a partition @rmoff | #GOTOpia | @confluentinc
 
                PUB / SUB @rmoff | #GOTOpia | @confluentinc
 
                PUB / SUB @rmoff | #GOTOpia | @confluentinc
 
                Producing data Old New Messages are added at the end of the log @rmoff | #GOTOpia | @confluentinc
 
                partition 0 … partition 1 producer … partition 2 … Partitioned Topic
 
                try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) { for (long i = 0; i < 10; i++) { final String orderId = “id” + Long.toString(i); final Payment payment = new Payment(orderId, 1000.00d); final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(“transactions”, payment.getId().toString(), payment); producer.send(record); } } catch (final InterruptedException e) { e.printStackTrace(); }
 
                package main import ( “gopkg.in/confluentinc/confluent-kafka-go.v1/kafka” ) func main() { topic := “test_topic” p, _ := kafka.NewProducer(&kafka.ConfigMap{ “bootstrap.servers”: “localhost:9092”}) defer p.Close() p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0}, Value: []byte(“Hello world”)}, nil) }
 
                Producing to Kafka - No Key Time Partition 1 Partition 2 Partition 3 Messages will be produced in a round robin fashion Partition 4 @rmoff | #GOTOpia | @confluentinc
 
                Producing to Kafka - With Key Time Partition 1 A Partition 2 B hash(key) % numPartitions = N Partition 3 C Partition 4 D @rmoff | #GOTOpia | @confluentinc
 
                Producers partition 0 … partition 1 producer … partition 2 … Partitioned Topic • A client application • Puts messages into topics • Handles partitioning, network protocol • Java, Go, .NET, C/C++, Python • Also every other language Plus REST proxy if not
 
                PUB / SUB @rmoff | #GOTOpia | @confluentinc
 
                Consuming data - access is only sequential Read to offset & scan Old New @rmoff | #GOTOpia | @confluentinc
 
                Consumers have a position of their own Old New Sally is here @rmoff | Scan #GOTOpia | @confluentinc
 
                Consumers have a position of their own Old New Fred is here Sally is here Scan @rmoff | Scan #GOTOpia | @confluentinc
 
                Consumers have a position of their own Rick is here Scan Old New Fred is here Sally is here Scan @rmoff | Scan #GOTOpia | @confluentinc
 
                try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(TOPIC)); while (true) { ConsumerRecords<String, Payment> records = consumer.poll(100); for (ConsumerRecord<String, Payment> record : records) { String key = record.key(); Payment value = record.value(); System.out.printf(“key = %s, value = %s%n”, key, value); } } }
 
                c, _ := kafka.NewConsumer(&cm) defer c.Close() c.Subscribe(topic, nil) for { select { case ev := <-c.Events(): switch ev.(type) { case *kafka.Message: km := ev.(*kafka.Message) fmt.Printf(“✅ Message ‘%v’ received from topic ‘%v’\n”, string(km.Value), string(*km.TopicPartition.Topic)) } } }
 
                Consuming From Kafka - Single Consumer Partition 1 Partition 2 C Partition 3 Partition 4 @rmoff | #GOTOpia | @confluentinc
 
                Consuming From Kafka - Multiple Consumers Partition 1 C1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc
 
                Consuming From Kafka - Multiple Consumers C1 Partition 1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc
 
                Consuming From Kafka - Grouped Consumers CC1 1 CC1 1 Partition 1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc
 
                CONSUMERS CONSUMER GROUP COORDINATOR CONSUMER GROUP
 
                Consuming From Kafka - Grouped Consumers Partition 1 Partition 2 Partition 3 C1 C2 C3 C4 Partition 4 @rmoff | #GOTOpia | @confluentinc
 
                Consuming From Kafka - Grouped Consumers Partition 1 Partition 2 Partition 3 C1 C2 C3 3 #GOTOpia | Partition 4 @rmoff | @confluentinc
 
                Consuming From Kafka - Grouped Consumers Partition 1 C1 Partition 2 Partition 3 C2 C3 Partition 4 @rmoff | #GOTOpia | @confluentinc
 
                Consumers partition 0 … partition 1 … consumer A consumer A consumer A partition 2 … Partitioned Topic consumer B • A client application • Reads messages from topics • Horizontally, elastically scalable (if stateless) • Java, Go, .NET, C/C++, Python, everything else Plus REST proxy if not
 
                BROKERS and REPLICATION @rmoff | #GOTOpia | @confluentinc
 
                Leader Partition Leadership and Replication Follower Partition 1 Partition 2 Partition 3 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc
 
                Leader Partition Leadership and Replication Follower Partition 1 Partition 1 Partition 1 Partition 2 Partition 2 Partition 2 Partition 3 Partition 3 Partition 3 Partition 4 Partition 4 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc
 
                Leader Partition Leadership and Replication Follower Partition 1 Partition 1 Partition 1 Partition 2 Partition 2 Partition 2 Partition 3 Partition 3 Partition 3 Partition 4 Partition 4 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc
 
                Photo by Raoul Droog on Unsplas DEMO @rmoff | #GOTOpia | @confluentinc
 
                So far, this is Pretty good @rmoff | #GOTOpia | @confluentinc
 
                So far, this is Pretty good but I’ve not finished yet… @rmoff | #GOTOpia | @confluentinc
 
                Streaming Pipelines Amazon S3 RDBMS HDFS @rmoff | #GOTOpia | @confluentinc
 
                Evolve processing from old systems to new Existing New App <x> App RDBMS @rmoff | #GOTOpia | @confluentinc
 
                 
                Streaming Integration with Kafka Connect syslog Sources Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc
 
                Streaming Integration with Kafka Connect Amazon Sinks Google Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc
 
                Streaming Integration with Kafka Connect Amazon syslog Google Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc
 
                Look Ma, No Code! { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “connection.url”: “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } @rmoff | #GOTOpia | @confluentinc
 
                Extensible Connector Transform(s) @rmoff Converter | #GOTOpia | @confluentinc
 
                hub.confluent.io @rmoff | #GOTOpia | @confluentinc
 
                Fault-tolerant? Nope. Kafka Connect Standalone Worker S3 Task #1 JDBC Task #1 JDBC Task #2 Offsets Worker @rmoff | #GOTOpia | @confluentinc
 
                Fault-tolerant? Yeah! Kafka Connect Distributed Worker S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status @rmoff | #GOTOpia | @confluentinc
 
                Fault-tolerant? Yeah! Scaling the Distributed Worker S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status @rmoff | #GOTOpia | @confluentinc
 
                Schema Registry @rmoff | #GOTOpia | @confluentinc
 
                K V
 
                K V
 
                K V
 
                K V
 
                K V ? s i h t s ’ t a h w … t i a W
 
                How do you serialise your data? JSON Avro Protobuf Schema JSON CSV @rmoff | #GOTOpia | @confluentinc
 
                APIs are contracts between services {user_id: 53, address: “2 Elm st.”} Profile service Quote service {user_id: 53, quote: 580} @rmoff | #GOTOpia | @confluentinc
 
                But not all services {user_id: 53, address: “2 Elm st.”} Profile service Quote service {user_id: 53, quote: 580} @rmoff | #GOTOpia | @confluentinc
 
                And naturally… {user_id: 53, address: “2 Elm st.”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc
 
                Schemas are about how teams work together {user_id: 53, timestamp: 1497842472} new Date(timestamp) Profile service Quote service Profile database create table ( user_id number, timestamp number) @rmoff Stream processing | #GOTOpia | @confluentinc
 
                Things change… {user_id: 53, timestamp: “June 28, 2017 4:00pm”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc
 
                Moving fast and breaking things {user_id: 53, timestamp: “June 28, 2017 4:00pm”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc
 
                Lack of schemas – Coupling teams and services 2001 2001 Citrus Heights-Sunrise Blvd Citrus_Hghts 60670001 3400293 34 SAC Sacramento SV Sacramento Valley SAC Sacramento County APCD SMA8 Sacramento Metropolitan Area CA 6920 Sacramento 28 6920 13588 7400 Sunrise Blvd 95610 38 41 56 38.6988889 121 16 15.98999977 -121.271111 10 4284781 650345 52 @rmoff | #GOTOpia | @confluentinc
 
                Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV @rmoff | #GOTOpia | @confluentinc
 
                Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV #
$ https://rmoff.dev/qcon-schemas @rmoff | #GOTOpia | @confluentinc
 
                It isn’t just about the services Software Teams Engineering & Culture Data & Metadata @rmoff | #GOTOpia | @confluentinc
 
                Schemas Schema Registry Topic producer … consumer
 
                partition 0 consumer A … consumer A partition 1 … consumer A partition 2 … consumer B Partitioned Topic @rmoff | #GOTOpia | @confluentinc
 
                consumer A consumer A consumer A @rmoff | #GOTOpia | @confluentinc
 
                } “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash { @rmoff | #GOTOpia | @confluentinc
 
                Streams of events Time @rmoff | #GOTOpia | @confluentinc
 
                Stream Processing Stream: widgets Stream: widgets_red @rmoff | #GOTOpia | @confluentinc
 
                Stream Processing with Kafka Streams Stream: widgets final StreamsBuilder builder = new StreamsBuilder() .stream(“widgets”, Consumed.with(stringSerde, widgetsSerde)) .filter( (key, widget) -> widget.getColour().equals(“RED”) ) .to(“widgets_red”, Produced.with(stringSerde, widgetsSerde)); Stream: widgets_red @rmoff | #GOTOpia | @confluentinc
 
                consumer A consumer A consumer A @rmoff | #GOTOpia | @confluentinc
 
                Streams Application Streams Application Streams Application @rmoff | #GOTOpia | @confluentinc
 
                Properties streamsConfiguration = getProperties(SCHEMA_REGISTRY_URL); final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); final SpecificAvroSerde<Movie> movieSerde = getMovieAvroSerde(serdeConfig); final SpecificAvroSerde<Rating> ratingSerde = getRatingAvroSerde(serdeConfig); final SpecificAvroSerde<RatedMovie> ratedMovieSerde = new SpecificAvroSerde<>(); ratingSerde.configure(serdeConfig, false); StreamsBuilder builder = new StreamsBuilder(); KTable<Long, Double> ratingAverage = getRatingAverageTable(builder); getRatedMoviesTable(builder, ratingAverage, movieSerde); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); streams.start(); } private static SpecificAvroSerde<Rating> getRatingAvroSerde(Map<String, String> serdeConfig) { final SpecificAvroSerde<Rating> ratingSerde = new SpecificAvroSerde<>(); ratingSerde.configure(serdeConfig, false); return ratingSerde;
 
                final SpecificAvroSerde<Movie> movieSerde = new SpecificAvroSerde<>(); movieSerde.configure(serdeConfig, false); return movieSerde; } public static KTable<Long, String> getRatedMoviesTable(StreamsBuilder builder, KTable<Long, Double> ratingAverage, SpecificAvroSerde<Movie> movieSerde) { builder.stream(“raw-movies”, Consumed.with(Serdes.Long(), Serdes.String())) .mapValues(Parser::parseMovie) .map((key, movie) -> new KeyValue<>(movie.getMovieId(), movie)) .to(“movies”, Produced.with(Serdes.Long(), movieSerde)); KTable<Long, Movie> movies = builder.table(“movies”, Materialized .<Long, Movie, KeyValueStore<Bytes, byte[]>>as( “movies-store”) .withValueSerde(movieSerde) .withKeySerde(Serdes.Long()) ); KTable<Long, String> ratedMovies = ratingAverage .join(movies, (avg, movie) -> movie.getTitle() + “=” + avg); ratedMovies.toStream().to(“rated-movies”, Produced.with(Serdes.Long(), Serdes.String())); return ratedMovies; }
 
                .join(movies, (avg, movie) -> movie.getTitle() + “=” + avg); ratedMovies.toStream().to(“rated-movies”, Produced.with(Serdes.Long(), Serdes.String())); return ratedMovies; } public static KTable<Long, Double> getRatingAverageTable(StreamsBuilder builder) { KStream<Long, String> rawRatings = builder.stream(“raw-ratings”, Consumed.with(Serdes.Long(), Serdes.String())); KStream<Long, Rating> ratings = rawRatings.mapValues(Parser::parseRating) .map((key, rating) -> new KeyValue<>(rating.getMovieId(), rating)); KStream<Long, Double> numericRatings = ratings.mapValues(Rating::getRating); KGroupedStream<Long, Double> ratingsById = numericRatings.groupByKey(); KTable<Long, Long> ratingCounts = ratingsById.count(); KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2); KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts, (sum, count) -> sum / count.doubleValue(), Materialized.as(“average-ratings”)); ratingAverage.toStream() /.peek((key, value) -> { // debug only System.out.println(“key = ” + key + “, value = ” + value); })/ .to(“average-ratings”); return ratingAverage;
 
                KTable<Long, Movie> movies = builder.table(“movies”, Materialized. <Long, Movie,KeyValueStore< Bytes, byte[]>> as(“movies-store”) .withValueSerde(movieSerde) .withKeySerde(Serdes.Long()) ); • Java API • Filter, join, aggregate, etc. • Locates stream processing with your application • Scales like a Consumer Group (but better!)
 
                Stream Processing with ksqlDB Stream: widgets ksqlDB CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red @rmoff | #GOTOpia | @confluentinc
 
                } “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash { @rmoff | #GOTOpia | @confluentinc
 
                SELECT * FROM WIDGETS WHERE WEIGHT_G > 120 { SELECT COUNT(*) FROM WIDGETS GROUP BY PRODUCTION_LINE } “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash SELECT AVG(TEMP_CELCIUS) AS TEMP FROM WIDGETS GROUP BY SENSOR_ID HAVING TEMP>20 CREATE SINK CONNECTOR dw WITH ( Object store, ‘connector.class’ = ‘S3Connector’, data warehouse, ‘topics’ = ‘widgets’ RDBMS …); @rmoff | #GOTOpia | @confluentinc
 
                ksqlDB The event streaming database purpose-built for stream processing applications. @rmoff | #GOTOpia | @confluentinc
 
                Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc
 
                Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc
 
                Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc
 
                Stream Processing with ksqlDB Source stream Analytics @rmoff | #GOTOpia | @confluentinc
 
                Stream Processing with ksqlDB Source stream Applications / Microservices @rmoff | #GOTOpia | @confluentinc
 
                Stream Processing with ksqlDB …SUM(TXN_AMT) GROUP BY AC_ID AC _I D= 42 BA LA NC AC E= _I 94 D= .0 42 0 Source stream Applications / Microservices @rmoff | #GOTOpia | @confluentinc
 
                Lookups and Joins with ksqlDB ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} @rmoff | #GOTOpia | @confluentinc
 
                Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} ORDERS @rmoff | #GOTOpia | @confluentinc
 
                Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; @rmoff | } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} #GOTOpia | @confluentinc
 
                Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9, “total_order_value”: 99.5 ORDERS_ENRICHED } @rmoff | #GOTOpia | @confluentinc
 
                Streams & Tables @rmoff | #GOTOpia | @confluentinc
 
                Streams and Tables Kafka topic (k/v bytes) { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } ksqlDB Stream +——————————+———-+————-+ |EVENT_TS |PERSON |LOCATION | +——————————+———-+————-+ |2020-02-17 15:22:00 |robin |Leeds | |2020-02-17 17:23:00 |robin |London | |2020-02-17 22:23:00 |robin |Wakefield| |2020-02-18 09:00:00 |robin |Leeds | Stream: Topic + Schema ksqlDB Table +———-+————-+ |PERSON |LOCATION | +———-+————-+ |robin |Leeds |London |Wakefield| | Table: state for given key Topic + Schema { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @rmoff | #GOTOpia | @confluentinc
 
                Stateful aggregations in ksqlDB Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } SELECT PERSON, COUNT(*) FROM MOVEMENTS GROUP BY PERSON; SELECT PERSON, COUNT_DISTINCT(LOCATION) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | LOCATION_CHANGES | +———-+—————————+ |robin | 4 1 2 3 | +———-+—————————+ |PERSON | UNIQUE_LOCATIONS | +———-+—————————+ |robin | 3 1 2 | { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Aggregations can be across the entire input, or windowed (TUMBLING, HOPPING, SESSION) @rmoff | #GOTOpia | @confluentinc
 
                Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Stateful aggregations in ksqlDB { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @rmoff | #GOTOpia | @confluentinc
 
                { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |3 |3 | Query terminated ksql> PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic Pull and Push queries in ksqlDB ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; EMIT CHANGES; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |1 |1 | |2 |2 | |3 |3 | |4 |3 | Press CTRL-C to interrupt Pull query @rmoff | Push query #GOTOpia | @confluentinc
 
                { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic ksqlDB REST API curl -s -X “POST” “http://localhost:8088/query” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{“ksql”:”SELECT UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=”’robin”’;”}’ {“value”:”3”} @rmoff | #GOTOpia | @confluentinc
 
                Pull and Push queries in ksqlDB Pull query Tells you: Point in time value Push query All value changes Exits: Immediately Never @rmoff | #GOTOpia | @confluentinc
 
                ksqlDB or Kafka Streams? @rmoff | #GOTOpia | @confluentinc Photo by Ramiz Dedaković on Unsplash
 
                Standing on the Shoulders of Streaming Giants ksqlDB Powered by Ease of use ksqlDB UDFs Kafka Streams Powered by Producer, Consumer APIs Flexibility @rmoff | #GOTOpia | @confluentinc
 
                Photo by Raoul Droog on Unsplas DEMO @rmoff | #GOTOpia | @confluentinc
 
                Summary @rmoff | #GOTOpia | @confluentinc
 
                @rmoff | #GOTOpia | @confluentinc
 
                K V @rmoff | #GOTOpia | @confluentinc
 
                K V @rmoff | #GOTOpia | @confluentinc
 
                The Log @rmoff | #GOTOpia | @confluentinc
 
                Producer Consumer The Log @rmoff | #GOTOpia | @confluentinc
 
                Producer Consumer Connectors The Log @rmoff | #GOTOpia | @confluentinc
 
                Producer Consumer Connectors The Log Streaming Engine @rmoff | #GOTOpia | @confluentinc
 
                Apache Kafka Producer Consumer Connectors The Log Streaming Engine @rmoff | #GOTOpia | @confluentinc
 
                Confluent Platform ksqlDB Producer Consumer Connectors The Log Schema Registry Streaming Engine @rmoff | #GOTOpia | @confluentinc
 
                EVENTS are EVERYWHERE @rmoff | #GOTOpia | @confluentinc
 
                EVENTS y r e v ^ are POWERFUL @rmoff | #GOTOpia | @confluentinc
 
                Standby for resource links… @rmoff | #GOTOpia | @confluentinc
 
                Free Books! https://rmoff.dev/gotopia @rmoff | #GOTOpia | @confluentinc
 
                60 DE VA DV $50 USD off your bill each calendar month for the first three months when you sign up https://rmoff.dev/ccloud Free money! (additional $60 towards your bill % ) Fully Managed Kafka as a Service * T&C: https://www.confluent.io/confluent-cloud-promo-disclaimer
 
                Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io
 
                Confluent Community Slack group cnfl.io/slack @rmoff | #GOTOpia | @confluentinc
 
                #EOF @rmoff rmoff.dev/talks youtube.com/rmoff
