Вычислить среднюю скорость дороги [закрыто]

20

Я пошел на собеседование по работе с инженером данных Интервьюер задал мне вопрос. Он дал мне некоторую ситуацию и попросил спроектировать поток данных для этой системы. Я решил это, но ему не понравилось мое решение, и я потерпел неудачу. Я хотел бы знать, если у вас есть лучшие идеи, как решить эту проблему.

Вопрос был:

Наша система получает четыре потока данных. Данные содержат идентификатор автомобиля, скорость и координаты геолокации. Каждый автомобиль отправляет свои данные раз в минуту. Нет никакой связи между конкретным потоком, конкретной дорогой, транспортным средством или чем-то еще. Есть функция, которая принимает координаты и возвращает название участка дороги. Нам нужно знать среднюю скорость на участок дороги за 5 минут. Наконец, мы хотим записать результаты в Кафку.

введите описание изображения здесь

Итак, мое решение было:

Сначала запишите все данные в кластер Кафки, в одну тему, разделенную на 5-6 первых цифр широты, соединенных с 5-6 первыми цифрами долготы. Затем считывание данных с помощью структурированной потоковой передачи, добавление для каждой строки имени участка дороги по координатам (для этого есть предварительно определенный udf), а затем сбор данных по имени участка дороги.

Поскольку я делю данные в Kafka по 5-6 первым цифрам координат, после перевода координат в имя раздела нет необходимости передавать большой объем данных в правильный раздел, и поэтому я могу воспользоваться операцией colesce () это не вызывает полное перемешивание.

Затем рассчитывается средняя скорость на одного исполнителя.

Весь процесс будет происходить каждые 5 минут, и мы будем записывать данные в режиме добавления в конечный приемник Kafka.

введите описание изображения здесь

Опять же, интервьюеру не понравилось мое решение. Кто-нибудь может подсказать, как его улучшить или совершенно другую и лучшую идею?

Alon
источник
Не лучше ли спросить человека, что ему не нравится?
Джино Пан
Я думаю, что это плохая идея разбивать на конкатенированные значения lat-long. Не будет ли точка данных для каждой полосы представлена ​​в виде немного другой координаты?
Уэббер
@webber, поэтому я беру только несколько цифр, поэтому позиция не будет уникальной, но относительно размера участка дороги.
Алон

Ответы:

6

Я нашел этот вопрос очень интересным и подумал о том, чтобы попытаться его решить.

Как я оценил далее, ваша попытка сама по себе хороша, за исключением следующего:

разделен на 5-6 первых цифр широты, соединенных с 5-6 первыми цифрами долготы

Если у вас уже есть метод для получения идентификатора / имени участка дороги на основе широты и долготы, почему бы сначала не вызвать этот метод и использовать идентификатор / имя участка дороги для разделения данных?

И после этого все довольно просто, поэтому топология будет

Merge all four streams ->
Select key as the road section id/name ->
Group the stream by Key -> 
Use time windowed aggregation for the given time ->
Materialize it to a store. 

(Более подробное объяснение можно найти в комментариях в коде ниже. Пожалуйста, спросите, если что-то неясно)

Я добавил код в конце этого ответа, обратите внимание, что вместо среднего я использовал сумму, поскольку это проще продемонстрировать. Можно сделать среднее, сохраняя некоторые дополнительные данные.

Я подробно изложил ответ в комментариях. Ниже приведена схема топологии, сгенерированная из кода (спасибо https://zz85.github.io/kafka-streams-viz/ )

Топология:

Диаграмма топологии

    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;

    public class VehicleStream {
        // 5 minutes aggregation window
        private static final long AGGREGATION_WINDOW = 5 * 50 * 1000L;

        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();

            // Setting configs, change accordingly
            properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "vehicle.stream.app");
            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,kafka2:19092");
            properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            // initializing  a streambuilder for building topology.
            final StreamsBuilder builder = new StreamsBuilder();

            // Our initial 4 streams.
            List<String> streamInputTopics = Arrays.asList(
                    "vehicle.stream1", "vehicle.stream2",
                    "vehicle.stream3", "vehicle.stream4"
            );
            /*
             * Since there is no connection between a specific stream
             * to a specific road or vehicle or anything else,
             * we can take all four streams as a single stream
             */
            KStream<String, String> source = builder.stream(streamInputTopics);

            /*
             * The initial key is unimportant (which can be ignored),
             * Instead, we will be using the section name/id as key.
             * Data will contain comma separated values in following format.
             * VehicleId,Speed,Latitude,Longitude
             */
            WindowBytesStoreSupplier windowSpeedStore = Stores.persistentWindowStore(
                    "windowSpeedStore",
                    AGGREGATION_WINDOW,
                    2, 10, true
            );
            source
                    .peek((k, v) -> printValues("Initial", k, v))
                    // First, we rekey the stream based on the road section.
                    .selectKey(VehicleStream::selectKeyAsRoadSection)
                    .peek((k, v) -> printValues("After rekey", k, v))
                    .groupByKey()
                    .windowedBy(TimeWindows.of(AGGREGATION_WINDOW))
                    .aggregate(
                            () -> "0.0", // Initialize
                            /*
                             * I'm using summing here for the aggregation as that's easier.
                             * It can be converted to average by storing extra details on number of records, etc..
                             */
                            (k, v, previousSpeed) ->  // Aggregator (summing speed)
                                    String.valueOf(
                                            Double.parseDouble(previousSpeed) +
                                                    VehicleSpeed.getVehicleSpeed(v).speed
                                    ),
                            Materialized.as(windowSpeedStore)
                    );
            // generating the topology
            final Topology topology = builder.build();
            System.out.print(topology.describe());

            // constructing a streams client with the properties and topology
            final KafkaStreams streams = new KafkaStreams(topology, properties);
            final CountDownLatch latch = new CountDownLatch(1);

            // attaching shutdown handler
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }


        private static void printValues(String message, String key, Object value) {
            System.out.printf("===%s=== key: %s value: %s%n", message, key, value.toString());
        }

        private static String selectKeyAsRoadSection(String key, String speedValue) {
            // Would make more sense when it's the section id, rather than a name.
            return coordinateToRoadSection(
                    VehicleSpeed.getVehicleSpeed(speedValue).latitude,
                    VehicleSpeed.getVehicleSpeed(speedValue).longitude
            );
        }

        private static String coordinateToRoadSection(String latitude, String longitude) {
            // Dummy function
            return "Area 51";
        }

        public static class VehicleSpeed {
            public String vehicleId;
            public double speed;
            public String latitude;
            public String longitude;

            public static VehicleSpeed getVehicleSpeed(String data) {
                return new VehicleSpeed(data);
            }

            public VehicleSpeed(String data) {
                String[] dataArray = data.split(",");
                this.vehicleId = dataArray[0];
                this.speed = Double.parseDouble(dataArray[1]);
                this.latitude = dataArray[2];
                this.longitude = dataArray[3];
            }

            @Override
            public String toString() {
                return String.format("veh: %s, speed: %f, latlong : %s,%s", vehicleId, speed, latitude, longitude);
            }
        }
    }
Иршад П.И.
источник
Разве объединение всех потоков не является плохой идеей? Это может стать узким местом для вашего потока данных. Что происходит, когда вы начинаете получать все больше и больше потоков ввода по мере роста вашей системы? Это будет масштабируемым?
Wypul
@wypul> не объединяет ли все потоки плохую идею? -> Я думаю нет. Параллелизм в Kafka достигается не с помощью потоков, а с помощью разделов (и задач), потоков и т. Д. Потоки - это способ группировки данных. > Будет ли это масштабируемым? -> да. Поскольку мы указываем участки дороги и предполагаем, что участки дороги достаточно распределены, мы можем увеличить количество разделов для этих тем, чтобы параллельно обрабатывать поток в разных контейнерах. Мы можем использовать хороший алгоритм разбиения на основе участка дороги, чтобы распределить нагрузку по репликам.
Иршад П.И.
1

Проблема как таковая кажется простой, и предлагаемые решения уже имеют большой смысл. Мне интересно, беспокоился ли интервьюер о дизайне и производительности решения, на котором вы сосредоточились, или о точности результата. Поскольку другие сосредоточились на коде, дизайне и производительности, я буду весить на точность.

Потоковое решение

По мере поступления данных мы можем дать приблизительную оценку средней скорости дороги. Эта оценка будет полезна при обнаружении заторов, но будет отключена при определении ограничения скорости.

  1. Объедините все 4 потока данных вместе.
  2. Создайте окно на 5 минут для сбора данных со всех 4 потоков за 5 минут.
  3. Примените UDF к координатам, чтобы получить название улицы и название города. Названия улиц часто повторяются в разных городах, поэтому мы будем использовать название города + название улицы в качестве ключа.
  4. Рассчитать среднюю скорость с синтаксисом вроде -

    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

5. write the result to the Kafka Topic

Пакетное решение

Эта оценка будет отклонена, потому что размер выборки невелик. Нам потребуется пакетная обработка данных за весь месяц / квартал / год, чтобы более точно определить ограничение скорости.

  1. Прочитайте данные за годы из озера данных (или Кафка Тема)

  2. Примените UDF к координатам, чтобы получить название улицы и название города.

  3. Рассчитать среднюю скорость с синтаксисом вроде -


    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

  1. записать результат в озеро данных.

На основе этого более точного ограничения скорости мы можем прогнозировать медленный трафик в потоковом приложении.

Салим
источник
1

Я вижу несколько проблем с вашей стратегией разделения:

  • Когда вы говорите, что собираетесь разделить свои данные на основе первых 5-6 цифр лат длиной, вы не сможете заранее определить количество разделов kafka. У вас будут искаженные данные, так как на некоторых участках дороги вы будете наблюдать больший объем, чем на других.

  • И ваша комбинация клавиш в любом случае не гарантирует одинаковые данные участка дороги в одном и том же разделе, и, следовательно, вы не можете быть уверены, что не будет перетасовок.

Предоставленной ИМО информации недостаточно для проектирования всего конвейера данных. Потому что при разработке конвейера важную роль играет способ разделения данных. Вам следует узнать больше о данных, которые вы получаете, например, количество транспортных средств, размер входных потоков данных, фиксированное количество потоков или оно может увеличиться в будущем? Являются ли потоки входных данных, которые вы получаете, являются потоками Кафки? Сколько данных вы получите за 5 минут?

  • Теперь давайте предположим, что у вас есть 4 потока, записанных в 4 темы в kafka или 4 разделах, и у вас нет какого-либо конкретного ключа, но ваши данные разделены на основе какого-либо ключа центра обработки данных или хэш-разделен. Если нет, то это должно быть сделано на стороне данных, а не дедуплицировать данные в другом потоке kafka и разделять.
  • Если вы получаете данные в разных дата-центрах, вам нужно перенести данные в один кластер, и для этого вы можете использовать Kafka mirror maker или что-то подобное.
  • После того, как у вас есть все данные в одном кластере, вы можете запустить там структурированное потоковое задание с 5-минутным интервалом запуска и водяным знаком в зависимости от ваших требований.
  • Для расчета средних и избежать много перетасовки вы можете использовать комбинацию из mapValuesи reduceByKeyвместо GroupBy. Отослать это .
  • Вы можете записать данные в приемник кафки после обработки.
wypul
источник
mapValues ​​и reduByKey относятся к низкоуровневому СДР. Разве Catalyst не достаточно умен, чтобы генерировать наиболее эффективные СДР, когда я группирую и вычисляю среднее значение?
Алон
@Alon Catalyst наверняка сможет определить лучший план для выполнения вашего запроса, но если вы используете groupBy, данные с тем же ключом будут сначала перетаскиваться в тот же раздел, а затем применять к нему агрегированную операцию. mapValuesи reduceByдействительно относится к низкоуровневому СДР, но все равно будет работать лучше в этой ситуации, так как сначала будет рассчитывать агрегат на раздел, а затем выполнять перемешивание.
wypul
0

Основные проблемы, которые я вижу с этим решением:

  • Участки дороги, которые находятся на краю 6-значных квадратов карты, будут содержать данные в нескольких тематических разделах и будут иметь несколько средних скоростей.
  • Размер данных для ваших разделов Kafka может быть несбалансированным (город против пустыни). Разделение по идентификатору машины по первым цифрам может быть хорошей идеей для ИМО.
  • Не уверен, что я следовал за объединяющей частью, но это кажется проблематичным.

Я бы сказал, что решение нужно сделать: читать из потока Кафки -> UDF -> секция группового пути -> средний -> записывать в поток Кафки.

Дэвид Тауб
источник
0

Мой дизайн будет зависеть от

  1. Количество дорог
  2. Количество машин
  3. Расчет стоимости дороги по координатам

Если я хочу масштабировать для любого количества отсчетов, дизайн будет выглядеть так введите описание изображения здесь

Перекрестные заботы об этом дизайне -

  1. Поддерживать постоянное состояние входных потоков (если вводом является kafka, мы можем хранить смещения с помощью Kafka или извне)
  2. Периодически состояния контрольных точек для внешней системы (я предпочитаю использовать асинхронные барьеры контрольных точек во Flink )

Возможны некоторые практические улучшения этого дизайна -

  1. Функция кэширования участков дороги, если возможно, на основе дорог
  2. Обработка пропущенных пингов (на практике не каждый пинг доступен)
  3. Принимая во внимание кривизну дороги (с учетом и высоты)
yugandhar
источник