Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 54 additions & 28 deletions masterslave.q
Original file line number Diff line number Diff line change
@@ -1,62 +1,88 @@
//Master slave algorithm
//Each process initially assumes it's the master process
//Master process is determined by oldest process start time
//Master process is determined by oldest process start time
//Connect to other slave processes and retrieve details to determine which process is master

//run lines
//q torq.q -load masterslave.q -proctype lb -procname lb1 -p 6100 -debug -parentproctype wdb
//q torq.q -load masterslave.q -proctype lb -procname lb2 -p 6101 -debug -parentproctype wdb


// the list of processes to connect to
.servers.CONNECTIONS:`lb

// custom function, this is invoked when a new outbound connection is created
// to be customised to invoke negotiation of processes
.servers.connectcustom:{.lg.o[`connect;"found new connection"]; show x}

// create connections
.servers.startup[]
//test cases
//one process starts up, it becomes master
//two processes, only one is master
//two processes, master dies, slave takes over
//two processes, one is made master, new process added, original master(if it's starttime
// is still oldest)
//two processes, one is made master, an older process is connected, recently connected process
// should then become master
//3 processes, one master, master dies, one of the remaining two takes over

init:{
// the list of processes to connect to
.servers.CONNECTIONS:distinct .servers.CONNECTIONS,.proc.proctype;

// custom function, this is invoked when a new outbound connection is created
// to be customised to invoke negotiation of processes
.servers.connectcustom:{.lg.o[`connect;"found new connection"];
.masterslave.checkifmaster[];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get a new connection in, then we only want to get the details for that single new connection. WE don't want to re-run the whole thing for all connections, which I think is what is happening here.

};
// create connections
.servers.startup[]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having .servers.startup[] in here is fine for testing purposes, but in final version this shouldn't be in here, i.e. someone who loads this should do

.ms.init[]
and then call
.servers.startup[] later in their code

Then init function should also be within the .masterslave namespace

};

\d .masterslave

//table scehma of connected lb processes with ismaster status
statustable:([handle:`int$()] procname:`symbol$();starttimeUTC:`timestamp$();ismaster:`boolean$())
statustable:([handle:`int$()] procname:();starttimeUTC:`timestamp$();ismaster:`boolean$());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you change procname to ()? symbol is better.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was getting type errors because I kept switching between symbol and string procnames.
Will change so there will only be sym procnames


//store own process start timestamp
start:.z.p
start:.z.p;

//set details dict with own details
details:{`procname`starttimeUTC`ismaster!(.proc.procname;.masterslave.start;1b)}
details:{
`procname`starttimeUTC!(string .proc.procname;.masterslave.start)
};

//get details of procname provided
getdetails:{[processname] (first exec w from .servers.SERVERS where procname like processname) ".masterslave.details[]"}
getdetails:{[processname]
(first exec w from .servers.SERVERS where procname like processname,not null w) (`.masterslave.details;[])
};

//update .masterslave.statustable with other proc details and update ismaster col to determine which process is master
//update .masterslave.statustable with other proc details and update ismaster col to determine which process is master
addmember:{[processname]
update ismaster:0b from (`.masterslave.statustable upsert .masterslave.getdetails[processname],(enlist `handle)!(enlist first exec w from .servers.SERVERS where procname like processname)) where starttimeUTC<>min starttimeUTC}
`.masterslave.statustable upsert .masterslave.getdetails[processname],
(`handle`ismaster)!(first exec w from .servers.SERVERS where procname like processname,not null w;1b)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this return value used for anything?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as in when it returns `.masterslave.statustable?
This func is just to upsert process details dictionary into the .ms.statustable, I'm not aware of any other return value?

};

masterupdate:{update ismaster:starttimeUTC=min starttimeUTC from `.masterslave.statustable}

//is the process itself the master
ammaster:{exec ismaster from .masterslave.statustable where handle=0}
ammaster:{exec ismaster from .masterslave.statustable where handle=0};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably returning a list, should return an atom


//find which process is the master
findmaster:{first exec handle from .masterslave.statustable where ismaster=1b}
//find which process is the master - can only be run after checkifmaster has been run
findmaster:{exec handle, procname from .masterslave.statustable where ismaster=1b};

//wrapper func for finding master
checkifmaster:{
.servers.startup[];
.masterslave.addmember each string exec procname from .servers.SERVERS where proctype like "lb", not null w
}
(.masterslave.addmember')[string exec procname from .servers.SERVERS where proctype like "lb", not null w];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't hardcode the proctype in here. Should just be

where proctype=.proc.proctype
.i.e proctype is the same as the local process

.masterslave.masterupdate[]
};

deletedropped:{[W]delete from `.masterslave.statustable where handle=W};

//add pc override here
//remove dropped connection from statustable
//tell other alive processes to renegotiate who the master is

//tell other alive processes to renegotiate who the master is
pc:{[result;W]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pc should probably only take one param here, W

.masterslave.deletedropped[W];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need a check in here to not do this if W=0
When the process starts up in the background it will do a close event on the 0 handle, which we don't want to remove from the table.

.masterslave.checkifmaster[];
result
};

\d .

//populate statutable and find which process is master
.masterslave.checkifmaster[]

init[];

//.z.pc call
//.z.pc call
.z.pc:{.masterslave.pc[x y;y]}.z.pc;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to wrap this in an error trap (in case .z.pc isn't initially defined) i.e.
.z.pc:{.masterslave.pc[x y;y]}@[value;`.z.pc;{{[x]}}]
Also as per comment above, better to have .masterslave.pc only take one param, and then .z.pc is

.z.pc:{x y; .masterslave.pc[y]}@[value;`.z.pc;{{[x]}}]

.z.pc should only be defined within init[]