Skip to content

Commit

Permalink
Allow preprocessing information to be supplied via named pipes.
Browse files Browse the repository at this point in the history
  • Loading branch information
mkskeller committed Nov 22, 2021
1 parent ab63751 commit eac6456
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 4 deletions.
5 changes: 4 additions & 1 deletion Processor/Data_Files.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Preprocessing<T>* Preprocessing<T>::get_new(
return get_live_prep(proc, usage);
else
return new Sub_Data_Files<T>(machine.get_N(),
machine.template prep_dir_prefix<T>(), usage);
machine.template prep_dir_prefix<T>(), usage, BaseMachine::thread_num);
}

template<class T>
Expand Down Expand Up @@ -185,6 +185,9 @@ Sub_Data_Files<T>::~Sub_Data_Files()
template<class T>
void Sub_Data_Files<T>::seekg(DataPositions& pos)
{
if (OnlineOptions::singleton.file_prep_per_thread)
return;

if (T::LivePrep::use_part)
{
get_part().seekg(pos);
Expand Down
16 changes: 16 additions & 0 deletions Processor/OnlineOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ OnlineOptions::OnlineOptions() : playerno(-1)
bucket_size = 4;
cmd_private_input_file = "Player-Data/Input";
cmd_private_output_file = "";
file_prep_per_thread = false;
#ifdef VERBOSE
verbose = true;
#else
Expand Down Expand Up @@ -170,6 +171,16 @@ OnlineOptions::OnlineOptions(ez::ezOptionParser& opt, int argc,
"--live-preprocessing" // Flag token.
);

opt.add(
"", // Default.
0, // Required?
0, // Number of args expected.
0, // Delimiter if expecting multiple args.
"Preprocessing from files by thread (use with pipes)", // Help description.
"-f", // Flag token.
"--file-prep-per-thread" // Flag token.
);

opt.add(
to_string(default_batch_size).c_str(), // Default.
0, // Required?
Expand Down Expand Up @@ -224,6 +235,11 @@ OnlineOptions::OnlineOptions(ez::ezOptionParser& opt, int argc,
live_prep = not opt.get("-F")->isSet;
else
live_prep = opt.get("-L")->isSet;
if (opt.isSet("-f"))
{
live_prep = false;
file_prep_per_thread = true;
}
opt.get("-b")->getInt(batch_size);
opt.get("--memory")->getString(memtype);
bits_from_squares = opt.isSet("-Q");
Expand Down
1 change: 1 addition & 0 deletions Processor/OnlineOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class OnlineOptions
std::string cmd_private_input_file;
std::string cmd_private_output_file;
bool verbose;
bool file_prep_per_thread;

OnlineOptions();
OnlineOptions(ez::ezOptionParser& opt, int argc, const char** argv,
Expand Down
10 changes: 8 additions & 2 deletions Processor/PrepBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
#include "PrepBase.h"

#include "Data_Files.h"
#include "OnlineOptions.h"

string PrepBase::get_suffix(int thread_num)
{
(void) thread_num;
return "";
if (OnlineOptions::singleton.file_prep_per_thread)
{
assert(thread_num >= 0);
return "-T" + to_string(thread_num);
}
else
return "";
}

string PrepBase::get_filename(const string& prep_data_dir,
Expand Down
11 changes: 11 additions & 0 deletions Programs/Source/test_thread_mul.mpc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
n = 1000000
x = sint.Array(n)
x.assign_vector(regint.inc(n))

@multithread(2, n)
def _(base, size):
x.assign_vector(x.get_vector(base, size) ** 2, base)

print_ln('%s', x[2].reveal())
crash(x[2].reveal() != 4)
crash(x[n - 1].reveal() != (n - 1) ** 2)
17 changes: 17 additions & 0 deletions Scripts/test_streaming.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

make stream-fake-mascot-triples.x
./compile.py test_thread_mul || exit 1

rm Player-Data/2-p-128/Triples-p-P?-T?
mkdir Player-Data/2-p-128

for i in 0 1; do
for j in 0 1 2; do
mknod Player-Data/2-p-128/Triples-p-P$i-T$j p || exit 1
done
done

./stream-fake-mascot-triples.x &

Scripts/mascot.sh test_thread_mul -f || exit 1
20 changes: 19 additions & 1 deletion Tools/Buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "Processor/BaseMachine.h"

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>

bool BufferBase::rewind = false;

Expand All @@ -21,8 +23,19 @@ void BufferBase::setup(ifstream* f, int length, const string& filename,
this->filename = filename;
}

bool BufferBase::is_pipe()
{
struct stat buf;
if (stat(filename.c_str(), &buf))
return S_ISFIFO(buf.st_mode);
else
return false;
}

void BufferBase::seekg(int pos)
{
assert(not is_pipe());

#ifdef DEBUG_BUFFER
if (pos != 0)
printf("seek %d %s thread %d\n", pos, filename.c_str(),
Expand Down Expand Up @@ -52,6 +65,8 @@ void BufferBase::seekg(int pos)

void BufferBase::try_rewind()
{
assert(not is_pipe());

#ifndef INSECURE
string type;
if (field_type.size() and data_type.size())
Expand All @@ -70,6 +85,9 @@ void BufferBase::try_rewind()

void BufferBase::prune()
{
if (is_pipe())
return;

if (file and (not file->good() or file->peek() == EOF))
purge();
else if (file and file->tellg() != header_length)
Expand Down Expand Up @@ -99,7 +117,7 @@ void BufferBase::prune()

void BufferBase::purge()
{
if (file)
if (file and not is_pipe())
{
#ifdef VERBOSE
cerr << "Removing " << filename << endl;
Expand Down
1 change: 1 addition & 0 deletions Tools/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class BufferBase
const char* type = "", const string& field = {});
void seekg(int pos);
bool is_up() { return file != 0; }
bool is_pipe();
void try_rewind();
void prune();
void purge();
Expand Down
65 changes: 65 additions & 0 deletions Utils/stream-fake-mascot-triples.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* stream-fake-mascot-triples.cpp
*
*/

#include "Protocols/Share.h"
#include "Math/gfpvar.h"
#include "Tools/benchmarking.h"

#include "Math/Setup.hpp"
#include "Protocols/fake-stuff.hpp"

class Info
{
public:
int thread_num;
int nplayers;
gfpvar key;
pthread_t thread;
};

void* run(void* arg)
{
auto& info = *(Info*) arg;
Files<Share<gfpvar>> files(info.nplayers, info.key, PREP_DIR, DATA_TRIPLE, info.thread_num);
SeededPRNG G;
int count = 0;
while (true)
{
gfpvar triple[3];
for (int i = 0; i < 2; i++)
triple[i].randomize(G);
triple[2] = triple[0] * triple[1];
for (int i = 0; i < 3; i++)
files.output_shares(triple[i]);
count++;
}
cerr << "failed after " << count << endl;
return 0;
}

int main()
{
insecure("preprocessing");
typedef Share<gfpvar> T;
int nplayers = 2;
int lgp = 128;
string prep_data_prefix = PREP_DIR;
gfpvar::generate_setup<T>(prep_data_prefix, nplayers, lgp);
T::mac_key_type keyp;
generate_mac_keys<T>(keyp, nplayers, prep_data_prefix);

int nthreads = 3;
OnlineOptions::singleton.file_prep_per_thread = true;
vector<Info> infos(3);
for (int i = 0; i < nthreads; i++)
{
auto& info = infos[i];
info.thread_num = i;
info.nplayers = nplayers;
info.key = keyp;
pthread_create(&info.thread, 0, run, &info);
}
pthread_join(infos[0].thread, 0);
}

0 comments on commit eac6456

Please sign in to comment.