Компьютер: ты делаешь математику

13

Эта задача частично является задачей алгоритмов, включает в себя некоторую математику и частично является самой быстрой задачей кода.

Для некоторого положительного целого числа nрассмотрим равномерно случайную строку 1s и 0s длины nи назовите ее A. Теперь также рассмотрим вторую равномерно выбранную случайную строку длины n, значения которой -1, 0,или, 1и назовите ееB_pre . Теперь позвольте Bбыть B_pre+ B_pre. Это связано B_preс собой.

Теперь рассмотрим внутренний продукт A и B[j,...,j+n-1]и назовем его Z_jи индекс из 1.

задача

На выходе должен быть список n+1фракций. У iтерма в выводе должна быть точная вероятность того, что все первые iчлены Z_jс j <= iравными 0.

Гол

Самый большой, nдля которого ваш код дает правильный вывод менее чем за 10 минут на моей машине.

Tie Breaker

Если два ответа имеют одинаковую оценку, победит тот, который отправил первый.

В очень (очень) маловероятном случае, когда кто-то найдет способ получить неограниченное количество баллов, будет принято первое действительное доказательство такого решения.

намек

Не пытайтесь решить эту проблему математически, это слишком сложно. На мой взгляд, лучший способ - вернуться к базовым определениям вероятности из старшей школы и найти умные способы получить код, чтобы сделать исчерпывающий перечень возможностей.

Языки и библиотеки

Вы можете использовать любой язык со свободно доступным компилятором / интерпретатором и т. Д. для Linux и любых библиотек, которые также свободно доступны для Linux.

Моя машина Время будет работать на моей машине. Это стандартная установка Ubuntu на восьмиъядерный процессор AMD FX-8350. Это также означает, что мне нужно иметь возможность запустить ваш код. Как следствие, используйте только легкодоступное бесплатное программное обеспечение и, пожалуйста, включите подробные инструкции по компиляции и запуску вашего кода.


Некоторые тестовые выводы. Рассмотрим только первый вывод для каждого n. Это когда i=1. Для nот 1 до 13 они должны быть.

 1: 4/6
 2: 18/36
 3: 88/216
 4: 454/1296
 5: 2424/7776
 6: 13236/46656
 7: 73392/279936
 8: 411462/1679616
 9: 2325976/10077696
10: 13233628/60466176
11: 75682512/362797056
12: 434662684/2176782336
13: 2505229744/13060694016

Вы также можете найти общую формулу для i=1на http://oeis.org/A081671 .

Таблица лидеров (разделена по языкам)

  • n = 15. Python + параллельный питон + pypy за 1 мин 49 с Jakube
  • n = 17. C ++ за 3 минуты 37 секунд Кит Рэндалл
  • n = 16. C ++ за 2 минуты 38 с. kuroi neko
Мартин Эндер
источник
1
@Knerd Как я могу сказать нет. Я постараюсь понять, как запустить ваш код в Linux, но любая помощь очень ценится.
Хорошо, извините за удаление комментариев. Для всего, что не читалось, это было, если F # или C # позволены :)
Knerd
Другой вопрос, у вас может быть пример правильного ввода?
Кнерд
Какая у вас графическая карта? Похоже, работа для GPU.
Майкл М.
1
@Knerd Вместо этого я добавил таблицу вероятностей в вопрос. Я надеюсь, что это полезно.

Ответы:

5

C ++, n = 18 за 9 минут на 8 потоков

(Дайте мне знать, если на вашей машине это займет не более 10 минут.)

Я использую несколько форм симметрии в массиве B. Это циклический (сдвиг на одну позицию), реверс (обратный порядок элементов) и знак (взять отрицательный для каждого элемента). Сначала я вычисляю список Bs, которые нам нужно попробовать, и их вес. Затем каждый B проходит через быструю подпрограмму (с использованием инструкций подсчета битов) для всех 2 ^ n значений A.

Вот результат для n == 18:

> time ./a.out 18
 1: 16547996212044 / 101559956668416
 2:  3120508430672 / 101559956668416
 3:   620923097438 / 101559956668416
 4:   129930911672 / 101559956668416
 5:    28197139994 / 101559956668416
 6:     6609438092 / 101559956668416
 7:     1873841888 / 101559956668416
 8:      813806426 / 101559956668416
 9:      569051084 / 101559956668416
10:      510821156 / 101559956668416
11:      496652384 / 101559956668416
12:      493092812 / 101559956668416
13:      492186008 / 101559956668416
14:      491947940 / 101559956668416
15:      491889008 / 101559956668416
16:      449710584 / 101559956668416
17:      418254922 / 101559956668416
18:      409373626 / 101559956668416

real    8m55.854s
user    67m58.336s
sys 0m5.607s

Скомпилируйте программу ниже с g++ --std=c++11 -O3 -mpopcnt dot.cc

#include <stdio.h>
#include <stdlib.h>
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>

using namespace std;

typedef long long word;

word n;

void inner(word bpos, word bneg, word w, word *cnt) {
    word maxi = n-1;
    for(word a = (1<<n)-1; a >= 0; a--) {
        word m = a;
        for(word i = maxi; i >= 0; i--, m <<= 1) {
            if(__builtin_popcount(m&bpos) != __builtin_popcount(m&bneg))
                break;
            cnt[i]+=w;
        }
    }
}

word pow(word n, word e) {
    word r = 1;
    for(word i = 0; i < e; i++) r *= n;
    return r;
}

typedef struct {
    word b;
    word weight;
} Bentry;

mutex block;
Bentry *bqueue;
word bhead;
word btail;
word done = -1;

word maxb;

// compute -1*b
word bneg(word b) {
    word w = 1;
    for(word i = 0; i < n; i++, w *= 3) {
        word d = b / w % 3;
        if(d == 1)
            b += w;
        if(d == 2)
            b -= w;
    }
    return b;
}

// rotate b one position
word brot(word b) {
    b *= 3;
    b += b / maxb;
    b %= maxb;
    return b;
}

// reverse b
word brev(word b) {
    word r = 0;
    for(word i = 0; i < n; i++) {
        r *= 3;
        r += b % 3;
        b /= 3;
    }
    return r;
}

// individual thread's work routine
void work(word *cnt) {
    while(true) {
        // get a queue entry to work on
        block.lock();
        if(btail == done) {
            block.unlock();
            return;
        }
        if(bhead == btail) {
            block.unlock();
            this_thread::sleep_for(chrono::microseconds(10));
            continue;
        }
        word i = btail++;
        block.unlock();

        // thread now owns bqueue[i], work on it
        word b = bqueue[i].b;
        word w = 1;
        word bpos = 0;
        word bneg = 0;
        for(word j = 0; j < n; j++, b /= 3) {
            word d = b % 3;
            if(d == 1)
                bpos |= 1 << j;
            if(d == 2)
                bneg |= 1 << j;
        }
        bpos |= bpos << n;
        bneg |= bneg << n;
        inner(bpos, bneg, bqueue[i].weight, cnt);
    }
}

int main(int argc, char *argv[]) {
    n = atoi(argv[1]);

    // allocate work queue
    maxb = pow(3, n);
    bqueue = (Bentry*)(malloc(maxb*sizeof(Bentry)));

    // start worker threads
    word procs = thread::hardware_concurrency();
    vector<thread> threads;
    vector<word*> counts;
    for(word p = 0; p < procs; p++) {
        word *cnt = (word*)calloc(64+n*sizeof(word), 1);
        threads.push_back(thread(work, cnt));
        counts.push_back(cnt);
    }

    // figure out which Bs we actually want to test, and with which weights
    bool *bmark = (bool*)calloc(maxb, 1);
    for(word i = 0; i < maxb; i++) {
        if(bmark[i]) continue;
        word b = i;
        word w = 0;
        for(word j = 0; j < 2; j++) {
            for(word k = 0; k < 2; k++) {
                for(word l = 0; l < n; l++) {
                    if(!bmark[b]) {
                        bmark[b] = true;
                        w++;
                    }
                    b = brot(b);
                }
                b = bneg(b);
            }
            b = brev(b);
        }
        bqueue[bhead].b = i;
        bqueue[bhead].weight = w;
        block.lock();
        bhead++;
        block.unlock();
    }
    block.lock();
    done = bhead;
    block.unlock();

    // add up results from threads
    word *cnt = (word*)calloc(n,sizeof(word));
    for(word p = 0; p < procs; p++) {
        threads[p].join();
        for(int i = 0; i < n; i++) cnt[i] += counts[p][i];
    }
    for(word i = 0; i < n; i++)
        printf("%2lld: %14lld / %14lld\n", i+1, cnt[n-1-i], maxb<<n);
    return 0;
}
Кит Рэндалл
источник
Хорошо, это избавляет меня от дальнейшей работы над моим любимым монстром ...
Спасибо за это. У вас есть текущая победная запись. Мы должны -pthreadснова вспомнить . Я добираюсь до n=17своей машины.
Упс .. Вы должны были получить полную награду. Извините, я пропустил срок.
@Lembik: нет проблем.
Кит Рэндалл
6

Python 2 с использованием pypy и pp: n = 15 за 3 минуты

Также просто грубая сила. Интересно видеть, что я почти получаю ту же скорость, что и курои нэко с C ++. Мой код может достигать n = 12около 5 минут. И я запускаю его только на одном виртуальном ядре.

изменить: уменьшить пространство поиска в n

Я заметил, что циклическое вектор A*из Aпроизводит одни и то же число , как и вероятности ( то же номера) в качестве исходного вектора , Aкогда я перебирать B. Например , вектор (1, 1, 0, 1, 0, 0)имеет ту же вероятность , как каждый из векторов (1, 0, 1, 0, 0, 1), (0, 1, 0, 0, 1, 1), (1, 0, 0, 1, 1, 0), (0, 0, 1, 1, 0, 1)и (0, 1, 1, 0, 1, 0)при выборе случайного образом B. Поэтому мне не нужно перебирать каждый из этих 6 векторов, а только около 1 и заменять count[i] += 1на count[i] += cycle_number.

Это уменьшает сложность от Theta(n) = 6^nдо Theta(n) = 6^n / n. Поэтому n = 13это примерно в 13 раз быстрее, чем моя предыдущая версия. Он рассчитывается n = 13примерно за 2 минуты 20 секунд. Потому n = 14что это все еще слишком медленно. Это займет около 13 минут.

редактировать 2: многоядерное программирование

Не очень доволен следующим улучшением. Я решил также попытаться выполнить мою программу на нескольких ядрах. На моих 2 + 2 ядрах я теперь могу вычислить n = 14примерно за 7 минут. Только улучшение в 2 раза.

Код доступен в этом репозитории github: Ссылка . Многоядерное программирование делает его немного уродливым.

редактировать 3: Сокращение пространства поиска для Aвекторов и Bвекторов

Я заметил ту же зеркальную симметрию для векторов, Aчто и Курой Неко. Все еще не уверен, почему это работает (и если это работает для каждогоn ).

Сокращение пространства поиска для Bвекторов немного умнее. Я заменил генерацию векторов ( itertools.product) собственной функцией. По сути, я начинаю с пустого списка и помещаю его в стек. Пока стек не пуст, я удаляю список, если он не имеет той же длины, что nи я, я генерирую 3 других списка (добавляя -1, 0, 1) и помещая их в стек. У меня такой же список, как nи у меня, я могу оценить суммы.

Теперь, когда я сам генерирую векторы, я могу фильтровать их в зависимости от того, смогу ли я достичь суммы = 0 или нет. Например, если мой вектор Aесть (1, 1, 1, 0, 0), и мой вектор Bвыглядит (1, 1, ?, ?, ?), я знаю, что я не могу заполнить ?значениями, так что A*B = 0. Поэтому мне не нужно перебирать все эти 6 векторов Bформы (1, 1, ?, ?, ?).

Мы можем улучшить это, если проигнорируем значения для 1. Как отмечалось в вопросе, значения для i = 1представляют собой последовательность A081671 . Есть много способов рассчитать их. Я выбираю простое повторение a(n) = (4*(2*n-1)*a(n-1) - 12*(n-1)*a(n-2)) / n. Так как мы можем вычислить i = 1практически за короткое время, мы можем отфильтровать больше векторов B. Например A = (0, 1, 0, 1, 1)и B = (1, -1, ?, ?, ?). Мы можем игнорировать векторы, где первый ? = 1, потому что A * cycled(B) > 0, для всех этих векторов. Я надеюсь, что вы можете следовать. Это, наверное, не лучший пример.

С этим я могу рассчитать n = 15за 6 минут.

редактировать 4:

Быстро реализовал отличную идею Куроя Неко, которая гласит, что Bи -Bдает те же результаты. Ускорение х2. Реализация - это только быстрый взлом. n = 15через 3 минуты.

Код:

Для полного кода посетите Github . Следующий код является только представлением основных функций. Я пропустил импорт, многоядерное программирование, печать результатов, ...

count = [0] * n
count[0] = oeis_A081671(n)

#generating all important vector A
visited = set(); todo = dict()
for A in product((0, 1), repeat=n):
    if A not in visited:
        # generate all vectors, which have the same probability
        # mirrored and cycled vectors
        same_probability_set = set()
        for i in range(n):
            tmp = [A[(i+j) % n] for j in range(n)]
            same_probability_set.add(tuple(tmp))
            same_probability_set.add(tuple(tmp[::-1]))
        visited.update(same_probability_set)
        todo[A] = len(same_probability_set)

# for each vector A, create all possible vectors B
stack = []
for A, cycled_count in dict_A.iteritems():
    ones = [sum(A[i:]) for i in range(n)] + [0]
    # + [0], so that later ones[n] doesn't throw a exception
    stack.append(([0] * n, 0, 0, 0, False))

    while stack:
        B, index, sum1, sum2, used_negative = stack.pop()
        if index < n:
            # fill vector B[index] in all possible ways,
            # so that it's still possible to reach 0.
            if used_negative:
                for v in (-1, 0, 1):
                    sum1_new = sum1 + v * A[index]
                    sum2_new = sum2 + v * A[index - 1 if index else n - 1]
                    if abs(sum1_new) <= ones[index+1]:
                        if abs(sum2_new) <= ones[index] - A[n-1]:
                            C = B[:]
                            C[index] = v
                            stack.append((C, index + 1, sum1_new, sum2_new, True))
            else:
                for v in (0, 1):
                    sum1_new = sum1 + v * A[index]
                    sum2_new = sum2 + v * A[index - 1 if index else n - 1]
                    if abs(sum1_new) <= ones[index+1]:
                        if abs(sum2_new) <= ones[index] - A[n-1]:
                            C = B[:]
                            C[index] = v
                            stack.append((C, index + 1, sum1_new, sum2_new, v == 1))
        else:
            # B is complete, calculate the sums
            count[1] += cycled_count  # we know that the sum = 0 for i = 1
            for i in range(2, n):
                sum_prod = 0
                for j in range(n-i):
                    sum_prod += A[j] * B[i+j]
                for j in range(i):
                    sum_prod += A[n-i+j] * B[j]
                if sum_prod:
                    break
                else:
                    if used_negative:
                        count[i] += 2*cycled_count
                    else:
                        count[i] += cycled_count

Использование:

Вы должны установить Pypy (для Python 2 !!!). Модуль параллельного Python не портирован для Python 3. Затем вам необходимо установить модуль параллельного Python pp-1.6.4.zip . Распакуйте его cdв папку и позвоните pypy setup.py install.

Тогда вы можете вызвать мою программу с

pypy you-do-the-math.py 15

Он автоматически определит количество процессоров. После завершения программы могут быть сообщения об ошибках, просто игнорируйте их. n = 16должно быть возможно на вашей машине.

Выход:

Calculation for n = 15 took 2:50 minutes

 1  83940771168 / 470184984576  17.85%
 2  17379109692 / 470184984576   3.70%
 3   3805906050 / 470184984576   0.81%
 4    887959110 / 470184984576   0.19%
 5    223260870 / 470184984576   0.05%
 6     67664580 / 470184984576   0.01%
 7     30019950 / 470184984576   0.01%
 8     20720730 / 470184984576   0.00%
 9     18352740 / 470184984576   0.00%
10     17730480 / 470184984576   0.00%
11     17566920 / 470184984576   0.00%
12     17521470 / 470184984576   0.00%
13     17510280 / 470184984576   0.00%
14     17507100 / 470184984576   0.00%
15     17506680 / 470184984576   0.00%

Заметки и идеи:

  • У меня процессор i7-4600m с 2 ядрами и 4 потоками. Неважно, если я использую 2 или 4 темы. Загрузка процессора составляет 50% с 2 потоками и 100% с 4 потоками, но это все равно занимает столько же времени. Я не знаю почему. Я проверил, что каждый поток имеет только половину объема данных, когда есть 4 потока, проверил результаты, ...
  • Я использую много списков. Python не очень эффективен при хранении, мне приходится копировать много списков, поэтому я подумал об использовании целого числа вместо этого. Я мог бы использовать биты 00 (для 0) и 11 (для 1) в векторе A, и биты 10 (для -1), 00 (для 0) и 01 (для 1) в векторе B. Для произведения из A и B, мне нужно будет только рассчитать A & Bи сосчитать блоки 01 и 10. Циклирование может быть выполнено с помощью смещения вектора и использования масок ... Я на самом деле все это реализовал, вы можете найти это в некоторых моих старых коммитах на Github. Но оказалось, что медленнее, чем со списками. Я думаю, Pypy действительно оптимизирует операции со списком.
Jakube
источник
На моем компьютере запуск n = 12 занимает 7:25, в то время как мой кусок C ++ занимает около 1:23, что делает его примерно в 5 раз быстрее. Имея только два настоящих ядра, мой ЦП получит примерно 2,5-кратный коэффициент по сравнению с однопоточным приложением, поэтому истинный 8-ядерный ЦП должен работать примерно в 3 раза быстрее, и это не считается с улучшением базовой скорости одноядерности по сравнению с мой стареющий i3-2100. Стоит ли спорить о том, стоит ли проходить через все эти циклы C ++ для экспоненциально растущего времени вычислений.
Я чувствую себя codegolf.stackexchange.com/questions/41021/… ... Будет ли полезна последовательность де Брюина ?
Kennytm
Что касается многопоточности, вы можете выжать немного больше из ваших 2 + 2 ядер, заблокировав каждый поток на одном. Усиление x2 происходит из-за того, что планировщик перемещается вокруг ваших потоков каждый раз, когда в системе перемещается спичка. С блокировкой ядра вы, вероятно, получите вместо этого усиление в 2,5 раза. Не знаю, позволяет ли Python устанавливать привязку к процессору.
Спасибо, я посмотрю на это. Но я новичок в многопоточности.
Якуб
В nbviewer.ipython.org/gist/minrk/5500077 есть упоминания об этом, хотя и используется другой инструмент для параллелизма.
5

неуклюжий хулиган - C ++ - слишком медленно

Ну, так как лучший программист взял реализацию C ++, я призываю к выходу для этого.

#include <cstdlib>
#include <cmath>
#include <vector>
#include <bitset>
#include <future>
#include <iostream>
#include <iomanip>

using namespace std;

/*
6^^n events will be generated, so the absolute max
that can be counted by a b bits integer is
E(b*log(2)/log(6)), i.e. n=24 for a 64 bits counter

To enumerate 3 possible values of a size n vector we need
E(n*log(3)/log(2))+1 bits, i.e. 39 bits
*/
typedef unsigned long long Counter; // counts up to 6^^24

typedef unsigned long long Benumerator; // 39 bits
typedef unsigned long      Aenumerator; // 24 bits

#define log2_over_log6 0.3869

#define A_LENGTH ((size_t)(8*sizeof(Counter)*log2_over_log6))
#define B_LENGTH (2*A_LENGTH)

typedef bitset<B_LENGTH> vectorB;

typedef vector<Counter> OccurenceCounters;

// -----------------------------------------------------------------
// multithreading junk for CPUs detection and allocation
// -----------------------------------------------------------------
int number_of_CPUs(void)
{
    int res = thread::hardware_concurrency();
    return res == 0 ? 8 : res;
}

#ifdef __linux__
#include <sched.h>
void lock_on_CPU(int cpu)
{
    cpu_set_t mask;
    CPU_ZERO(&mask);
    CPU_SET(cpu, &mask);
    sched_setaffinity(0, sizeof(mask), &mask);
}
#elif defined (_WIN32)
#include <Windows.h>
#define lock_on_CPU(cpu) SetThreadAffinityMask(GetCurrentThread(), 1 << cpu)
#else
// #warning is not really standard, so this might still cause compiler errors on some platforms. Sorry about that.
#warning "Thread processor affinity settings not supported. Performances might be improved by providing a suitable alternative for your platform"
#define lock_on_CPU(cpu)
#endif

// -----------------------------------------------------------------
// B values generator
// -----------------------------------------------------------------
struct Bvalue {
    vectorB p1;
    vectorB m1;
};

struct Bgenerator {
    int n;                 // A length
    Aenumerator stop;      // computation limit
    Aenumerator zeroes;    // current zeroes pattern
    Aenumerator plusminus; // current +1/-1 pattern
    Aenumerator pm_limit;  // upper bound of +1/-1 pattern

    Bgenerator(int n, Aenumerator start=0, Aenumerator stop=0) : n(n), stop(stop)
    {
        // initialize generator so that first call to next() will generate first value
        zeroes    = start - 1;
        plusminus = -1;
        pm_limit  = 0;
    }

    // compute current B value
    Bvalue value(void)
    {
        Bvalue res;
        Aenumerator pm = plusminus;
        Aenumerator position = 1;
        int i_pm = 0;
        for (int i = 0; i != n; i++)
        {
            if (zeroes & position)
            {
                if (i_pm == 0)  res.p1 |= position; // first non-zero value fixed to +1
                else         
                {
                    if (pm & 1) res.m1 |= position; // next non-zero values
                    else        res.p1 |= position;
                    pm >>= 1;
                }
                i_pm++;
            }
            position <<= 1;
        }
        res.p1 |= (res.p1 << n); // concatenate 2 Bpre instances
        res.m1 |= (res.m1 << n);
        return res;
    }

    // next value
    bool next(void)
    {
        if (++plusminus == pm_limit)
        {
            if (++zeroes == stop) return false;
            plusminus = 0;
            pm_limit = (1 << vectorB(zeroes).count()) >> 1;
        }
        return true;
    }

    // calibration: produces ranges that will yield the approximate same number of B values
    vector<Aenumerator> calibrate(int segments)
    {
        // setup generator for the whole B range
        zeroes = 0;
        stop = 1 << n;
        plusminus = -1;
        pm_limit = 0;

        // divide range into (nearly) equal chunks
        Aenumerator chunk_size = ((Aenumerator)pow (3,n)-1) / 2 / segments;

        // generate bounds for zeroes values
        vector<Aenumerator> res(segments + 1);
        int bound = 0;
        res[bound] = 1;
        Aenumerator count = 0;
        while (next()) if (++count % chunk_size == 0) res[++bound] = zeroes;
        res[bound] = stop;
        return res;
    }
};

// -----------------------------------------------------------------
// equiprobable A values merging
// -----------------------------------------------------------------
static char A_weight[1 << A_LENGTH];
struct Agroup {
    vectorB value;
    int     count;
    Agroup(Aenumerator a = 0, int length = 0) : value(a), count(length) {}
};
static vector<Agroup> A_groups;

Aenumerator reverse(Aenumerator n) // this works on N-1 bits for a N bits word
{
    Aenumerator res = 0;
    if (n != 0) // must have at least one bit set for the rest to work
    {
        // construct left-padded reverse value
        for (int i = 0; i != 8 * sizeof(n)-1; i++)
        {
            res |= (n & 1);
            res <<= 1;
            n >>= 1;
        }

        // shift right to elimitate trailing zeroes
        while (!(res & 1)) res >>= 1;
    }
    return res;
}

void generate_A_groups(int n)
{
    static bitset<1 << A_LENGTH> lookup(0);
    Aenumerator limit_A = (Aenumerator)pow(2, n);
    Aenumerator overflow = 1 << n;
    for (char & w : A_weight) w = 0;

    // gather rotation cycles
    for (Aenumerator a = 0; a != limit_A; a++)
    {
        Aenumerator rotated = a;
        int cycle_length = 0;
        for (int i = 0; i != n; i++)
        {
            // check for new cycles
            if (!lookup[rotated])
            {
                cycle_length++;
                lookup[rotated] = 1;
            }

            // rotate current value
            rotated <<= 1;
            if (rotated & overflow) rotated |= 1;
            rotated &= (overflow - 1);
        }

        // store new cycle
        if (cycle_length > 0) A_weight[a] = cycle_length;
    }

    // merge symetric groups
    for (Aenumerator a = 0; a != limit_A; a++)
    {
        // skip already grouped values
        if (A_weight[a] == 0) continue;

        // regroup a symetric pair
        Aenumerator r = reverse(a);
        if (r != a)
        {
            A_weight[a] += A_weight[r];
            A_weight[r] = 0;
        }  
    }

    // generate groups
    for (Aenumerator a = 0; a != limit_A; a++)
    {
        if (A_weight[a] != 0) A_groups.push_back(Agroup(a, A_weight[a]));
    }
}

// -----------------------------------------------------------------
// worker thread
// -----------------------------------------------------------------
OccurenceCounters solve(int n, int index, Aenumerator Bstart, Aenumerator Bstop)
{
    OccurenceCounters consecutive_zero_Z(n, 0);  // counts occurences of the first i terms of Z being 0

    // lock on assigned CPU
    lock_on_CPU(index);

    // enumerate B vectors
    Bgenerator Bgen(n, Bstart, Bstop);
    while (Bgen.next())
    {
        // get next B value
        Bvalue B = Bgen.value();

        // enumerate A vector groups
        for (const auto & group : A_groups)
        {
            // count consecutive occurences of inner product equal to zero
            vectorB sliding_A(group.value);
            for (int i = 0; i != n; i++)
            {
                if ((sliding_A & B.p1).count() != (sliding_A & B.m1).count()) break;
                consecutive_zero_Z[i] += group.count;
                sliding_A <<= 1;
            }
        }
    }
    return consecutive_zero_Z;
}

// -----------------------------------------------------------------
// main
// -----------------------------------------------------------------
#define die(msg) { cout << msg << endl; exit (-1); }

int main(int argc, char * argv[])
{
    int n = argc == 2 ? atoi(argv[1]) : 16; // arbitray value for debugging
    if (n < 1 || n > 24) die("vectors of lenght between 1 and 24 is all I can (try to) compute, guv");

    auto begin = time(NULL);

    // one worker thread per CPU
    int num_workers = number_of_CPUs();

    // regroup equiprobable A values
    generate_A_groups(n);

    // compute B generation ranges for proper load balancing
    vector<Aenumerator> ranges = Bgenerator(n).calibrate(num_workers);

    // set workers to work
    vector<future<OccurenceCounters>> workers(num_workers);
    for (int i = 0; i != num_workers; i++)
    {
        workers[i] = async(
            launch::async, // without this parameter, C++ will decide whether execution shall be sequential or asynchronous (isn't C++ fun?).
            solve, n, i, ranges[i], ranges[i+1]); 
    }

    // collect results
    OccurenceCounters result(n + 1, 0);
    for (auto& worker : workers)
    {
        OccurenceCounters partial = worker.get();
        for (size_t i = 0; i != partial.size(); i++) result[i] += partial[i]*2; // each result counts for a symetric B pair
    }
    for (Counter & res : result) res += (Counter)1 << n; // add null B vector contribution
    result[n] = result[n - 1];                           // the last two probabilities are equal by construction

    auto duration = time(NULL) - begin;

    // output
    cout << "done in " << duration / 60 << ":" << setw(2) << setfill('0') << duration % 60 << setfill(' ')
        << " by " << num_workers << " worker thread" << ((num_workers > 1) ? "s" : "") << endl;
    Counter events = (Counter)pow(6, n);
    int width = (int)log10(events) + 2;
    cout.precision(5);
    for (int i = 0; i <= n; i++) cout << setw(2) << i << setw(width) << result[i] << " / " << events << " " << fixed << (float)result[i] / events << endl;

    return 0;
}

Сборка исполняемого файла

Это автономный источник C ++ 11, который компилируется без предупреждений и работает на:

  • Win7 & MSVC2013
  • Win7 & MinGW - g ++ 4.7
  • Ubuntu & g ++ 4.8 (в виртуальной машине VirtualBox с 2 выделенными процессорами)

Если вы компилируете с g ++, используйте: g ++ -O3 -pthread -std = c ++ 11, если вы забудете
об этом, вы -pthreadполучите приятный и удобный дамп ядра.

Оптимизации

  1. Последний член Z равен первому (Bpre x A в обоих случаях), поэтому последние два результата всегда равны, что позволяет вычислить последнее значение Z.
    Усиление ничтожно мало, но его кодирование ничего не стоит, поэтому вы можете использовать его.

  2. Как обнаружил Якуб, все циклические значения данного вектора A дают одинаковые вероятности.
    Вы можете вычислить их с одним экземпляром A и умножить результат на количество его возможных поворотов. Группы вращения легко могут быть предварительно вычислены за незначительное количество времени, так что это огромный прирост скорости.
    Поскольку число перестановок вектора длины n равно n-1, сложность падает с o (6 n ) до o (6 n / (n-1)), что в основном делает шаг вперед в течение того же времени вычисления.

  3. Похоже, пары симметричных паттернов также генерируют одинаковые вероятности. Например, 100101 и 101001.
    У меня нет математических доказательств этого, но интуитивно, когда представлены все возможные шаблоны B, каждое симметричное значение A будет свернуто с соответствующим симметричным значением B для того же глобального результата.
    Это позволяет перегруппировать еще несколько векторов A, чтобы уменьшить число групп A примерно на 30%.

  4. НЕПРАВИЛЬНО По какой-то полузадачной причине все шаблоны с одним или двумя установленными битами дают одинаковый результат. Это не означает, что многие отдельные группы, но все же они могут быть объединены практически бесплатно.

  5. Векторы B и -B (B со всеми компонентами, умноженными на -1) дают одинаковые вероятности.
    (например, [1, 0, -1, 1] и [-1, 0, 1, -1]).
    За исключением нулевого вектора (все компоненты равны 0), B и -B образуют пару различных векторов.
    Это позволяет сократить число значений B пополам, учитывая только одно из каждой пары и умножая его вклад на 2, добавляя известный глобальный вклад нулевого B к каждой вероятности только один раз.

Как это устроено

Количество значений B огромно (3 n ), поэтому для их предварительного вычисления потребуются неприличные объемы памяти, что замедлит вычисления и в конечном итоге исчерпает доступную оперативную память.
К сожалению, я не смог найти простой способ перечисления половины набора оптимизированных значений B, поэтому я прибег к кодированию выделенного генератора.

Могучий генератор B был очень забавным для кода, хотя языки, которые поддерживают механизмы выхода, позволили бы программировать его намного более элегантным способом.
Вкратце, он рассматривает «скелет» вектора Bpre как двоичный вектор, где 1 представляют действительные значения -1 или +1.
Среди всех этих значений потенциала + 1 / -1 первое фиксируется на +1 (таким образом выбирается один из возможных векторов B / -B), и перечисляются все остальные возможные комбинации + 1 / -1.
Наконец, простая система калибровки гарантирует, что каждый рабочий поток будет обрабатывать диапазон значений примерно одинакового размера.

Значения сильно фильтруются для перегруппировки в равновероятных кусках.
Это делается на этапе предварительного вычисления, при котором перебор проверяет все возможные значения.
Эта часть имеет незначительное время выполнения O (2 n ) и не нуждается в оптимизации (код уже достаточно нечитаемый, как есть!).

Чтобы оценить внутренний продукт (который нужно проверить только на ноль), компоненты -1 и 1 группы B перегруппированы в двоичные векторы.
Внутренний продукт равен нулю, если (и только если) имеется равное число + 1 с и -1 с среди значений B, соответствующих ненулевым значениям A.
Это можно вычислить с помощью простых операций маскирования и подсчета битов, с помощью std::bitsetкоторых можно получить достаточно эффективный код подсчета бит без необходимости прибегать к уродливым внутренним инструкциям.

Работа распределяется поровну между ядрами, с принудительной привязкой к процессору (каждый немного помогает, или, как они говорят).

Пример результата

C:\Dev\PHP\_StackOverflow\C++\VectorCrunch>release\VectorCrunch.exe 16
done in 8:19 by 4 worker threads
 0  487610895942 / 2821109907456 0.17284
 1   97652126058 / 2821109907456 0.03461
 2   20659337010 / 2821109907456 0.00732
 3    4631534490 / 2821109907456 0.00164
 4    1099762394 / 2821109907456 0.00039
 5     302001914 / 2821109907456 0.00011
 6     115084858 / 2821109907456 0.00004
 7      70235786 / 2821109907456 0.00002
 8      59121706 / 2821109907456 0.00002
 9      56384426 / 2821109907456 0.00002
10      55686922 / 2821109907456 0.00002
11      55508202 / 2821109907456 0.00002
12      55461994 / 2821109907456 0.00002
13      55451146 / 2821109907456 0.00002
14      55449098 / 2821109907456 0.00002
15      55449002 / 2821109907456 0.00002
16      55449002 / 2821109907456 0.00002

Выступления

Многопоточность должна отлично работать на этом, хотя только «истинные» ядра будут полностью способствовать скорости вычислений. Мой процессор имеет только 2 ядра для 4 процессоров, и выигрыш по сравнению с однопоточной версией составляет «только» около 3,5.

Составители

Первоначальная проблема с многопоточностью привела меня к мысли, что компиляторы GNU работают хуже, чем Microsoft.

После более тщательного тестирования, похоже, g ++ снова побеждает, производя код примерно на 30% быстрее (такое же соотношение я заметил и в двух других проектах, требующих большого объема вычислений).

Примечательно, что std::bitsetбиблиотека реализована с помощью специальных инструкций подсчета битов в g ++ 4.8, в то время как MSVC 2013 использует только циклы обычных битовых сдвигов.

Как и следовало ожидать, компиляция в 32 или 64 битах не имеет значения.

Дальнейшие уточнения

Я заметил, что несколько групп А производят одинаковые вероятности после всех операций сокращения, но я не смог определить схему, которая позволила бы их перегруппировать.

Вот пары, которые я заметил для n = 11:

  10001011 and 10001101
 100101011 and 100110101
 100101111 and 100111101
 100110111 and 100111011
 101001011 and 101001101
 101011011 and 101101011
 101100111 and 110100111
1010110111 and 1010111011
1011011111 and 1011111011
1011101111 and 1011110111

источник
Я думаю, что последние две вероятности всегда должны быть одинаковыми. Это потому, что n + 1-й внутренний продукт фактически совпадает с первым.
Я имел в виду, что первые n внутренних произведений равны нулю тогда и только тогда, когда первые n + 1 равны. Самый последний внутренний продукт не предоставляет никакой новой информации, как вы уже делали это раньше. Таким образом, число строк, которые дают n нулевых произведений, точно такое же, как число строк, которые дают n + 1 нулевых произведений.
Из интереса, что вы вместо этого точно вычисляли?
Спасибо за обновление, но я не понимаю строку "0 2160009216 2176782336". Что именно вы рассчитываете в этом случае? Вероятность того, что первый внутренний продукт равен нулю, намного меньше, чем это.
Не могли бы вы дать несколько советов о том, как скомпилировать и запустить это? Я попробовал g ++ -Wall -std = c ++ 11 kuroineko.cpp -o kuroineko и ./kuroineko 12, но это даетterminate called after throwing an instance of 'std::system_error' what(): Unknown error -1 Aborted (core dumped)