Skip to content

Commit 7c93ac1

Browse files
committed
pyxes use futexes; faster. remove timeout-0 pun
1 parent 61fe9c6 commit 7c93ac1

5 files changed

Lines changed: 86 additions & 52 deletions

File tree

jsrc/cd.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
#include <Windows.h>
99
#include <stdint.h> // portable: uint64_t MSVC: __int64
1010

11-
struct jtimeval { long tv_sec, tv_usec; };
11+
struct jtimespec { long long tv_sec, tv_nsec; };
12+
struct jtimeval { long long tv_sec, tv_usec; };
1213
struct jtimezone { int tz_minuteswest, tz_dsttime; };
1314

1415
int jgettimeofday(struct jtimeval *tp, struct jtimezone * tzp)
@@ -31,4 +32,13 @@ int jgettimeofday(struct jtimeval *tp, struct jtimezone * tzp)
3132
tp->tv_usec = (long) (system_time.wMilliseconds * 1000);
3233
return 0;
3334
}
35+
36+
//monotonic clock
37+
//alternative is QueryPerformanceCounter; it probably uses rdtsc, which is stable on recent processors, but it gives inconsistent results when the processor goes to sleep
38+
struct jtimespec jmtclk(){
39+
UI t=GetTickCount64();
40+
R(struct jtimeval){.tv_sec=t/1000,.tv_nsec=1000000*(t%1000)};}
41+
#else
42+
#include"j.h"
43+
struct jtimespec jmtclk(){struct timespec r; clock_gettime(CLOCK_MONOTONIC,&r);R r;}
3444
#endif

jsrc/ct.c

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,14 @@ typedef struct pyxcondmutex{
176176
S pyxorigthread; // thread number that is working on this pyx, or _1 if the value is available
177177
C errcode; // 0 if no error, or error code
178178
#if PYXES
179-
WAITBLOK pyxwb; // sync info
179+
//WAITBLOK pyxwb; // sync info
180+
UI4 state;//one of the below pyx states. Monotonically increases
180181
#endif
181182
} PYXBLOK;
182-
183+
enum{
184+
PYXEMPTY, //the pyx is not filled in, and no one is waiting
185+
PYXWAIT, //at least 1 thread is waiting, and the pyx is not filled in
186+
PYXFULL}; //the pyx is filled in
183187
#if PYXES
184188

185189
// Install a value/errcode into a (recursive) pyx, and broadcast to anyone waiting on it. fa() the pyx to indicate that the thread has released the pyx
@@ -191,7 +195,7 @@ static I jtsetpyxval(J jt, A pyx, A z, C errcode){I res=1;
191195
if(likely(z!=0))ra(z); // since the pyx is recursive, we must ra the result we store into it. Could zap if inplaceable
192196
__atomic_store_n(&((PYXBLOK*)AAV0(pyx))->pyxvalue,z,__ATOMIC_RELEASE); // set result value
193197
// broadcast to wake up any tasks waiting for the result
194-
WAITBLOKFLAG(&((PYXBLOK*)AAV0(pyx))->pyxwb);
198+
if(PYXWAIT==xchga(&((PYXBLOK*)AAV0(pyx))->state,PYXFULL))jfutex_wakea(&((PYXBLOK*)AAV0(pyx))->state);
195199
// unprotect pyx. It was raised when it was assigned to this owner; now it belongs to the system
196200
fa(pyx);
197201
R 1;
@@ -201,41 +205,33 @@ static I jtsetpyxval(J jt, A pyx, A z, C errcode){I res=1;
201205
static A jtcreatepyx(J jt, I thread,D timeout){A pyx;
202206
// Allocate. Init value, cond, and mutex to idle
203207
GAT0(pyx,INT,((sizeof(PYXBLOK)+(SZI-1))>>LGSZI)+1,0); AAV0(pyx)[0]=0; // allocate the result pointer (1), and the cond/mutex for the pyx.
204-
WAITBLOKINIT(&((PYXBLOK*)AAV0(pyx))->pyxwb);
208+
((PYXBLOK*)AAV0(pyx))->state=PYXEMPTY;
205209
// Init the pyx to a recursive box, with raised usecount. AN=1 always. But set the value/errcode to NULL/no error and the thread# to the executing thread
206210
AT(pyx)=BOX+PYX; AFLAG(pyx)=BOX; ACINIT(pyx,ACUC2); AN(pyx)=1; ((PYXBLOK*)AAV0(pyx))->pyxvalue=0; ((PYXBLOK*)AAV0(pyx))->pyxorigthread=thread; ((PYXBLOK*)AAV0(pyx))->errcode=0; ((PYXBLOK*)AAV0(pyx))->pyxmaxwt=timeout;
207211
// The pyx's usecount of 2 is one for the owning thread and one for the current thread, which has a tpop for the pyx that protects it until it is put into its box. When the pyx is filled in the owner will fa().
208212
R pyx;
209213
}
210214

211215
// w is an A holding a pyx value. Return its value when it has been resolved. If it times out
212-
A jtpyxval(J jt,A pyx){A res; C errcode;
213-
D maxtime=tod()+((PYXBLOK*)AAV0(pyx))->pyxmaxwt+0.000001; // get the time when we have to give up on this pyx, min 1usec
214-
// read the pyx value. Since the creating thread has a release barrier after creation and another after final resolution, we can be sure
215-
// that if we read nonzero the pyx has been resolved, even without an acquire barrier
216-
while((res=__atomic_load_n(&((PYXBLOK*)AAV0(pyx))->pyxvalue,__ATOMIC_ACQUIRE))==0&&(errcode=__atomic_load_n(&((PYXBLOK*)AAV0(pyx))->errcode,__ATOMIC_ACQUIRE))==0){ // repeat till defined
217-
I adbreak=__atomic_load_n((US*)&JT(jt,adbreak)[0],__ATOMIC_ACQUIRE); // break requests
216+
A jtpyxval(J jt,A pyx){ UI4 state;
217+
if(PYXFULL==(state=lda(&((PYXBLOK*)AAV0(pyx))->state)))goto done;
218+
if(state!=PYXWAIT)if(!casa(&((PYXBLOK*)AAV0(pyx))->state,&state,PYXWAIT))goto done;
219+
UI ns=({D mwt=((PYXBLOK*)AAV0(pyx))->pyxmaxwt;mwt==inf?IMAX:(I)(mwt*1e9);});
220+
struct jtimespec end=jtmtil(ns); // get the time when we have to give up on this pyx
221+
while(1){ // repeat till defined
222+
_jfutex_waitn(&((PYXBLOK*)AAV0(pyx))->state,PYXWAIT,ns);
223+
if(lda(&((PYXBLOK*)AAV0(pyx))->state)==PYXFULL)break; // if pyx was filled, exit and return its value
224+
I adbreak=lda((US*)&JT(jt,adbreak)[0]); // break requests
218225
// wait till the value is defined. We have to make one last check inside the lock to make sure the value is still unresolved
219226
// The wait may time out because another thread is requesting a system lock. If so, we accept it now
220227
if(unlikely(adbreak>>8)!=0){jtsystemlockaccept(jt,LOCKPRISYM+LOCKPRIPATH+LOCKPRIDEBUG); continue;} // process lock and keep waiting
221228
// or, the user may be requesting a BREAK interrupt for deadlock or other slow execution. In that case fail the pyx. It will not be deleted until the value has been stored
222-
if(unlikely((adbreak&0xff)>1)){errcode=EVBREAK; break;} // JBREAK: fail the pyx and exit
223-
// if the pyx has a max time, see if that is exceeded
224-
if(unlikely(maxtime<tod())){errcode=EVTIME; break;} // timeout: fail the pyx and exit
225-
pthread_mutex_lock(&((PYXBLOK*)AAV0(pyx))->pyxwb.mutex);
226-
if((res=__atomic_load_n(&((PYXBLOK*)AAV0(pyx))->pyxvalue,__ATOMIC_ACQUIRE))==0&&(errcode=__atomic_load_n(&((PYXBLOK*)AAV0(pyx))->errcode,__ATOMIC_ACQUIRE))==0){
227-
struct jtimeval nowtime;
228-
jgettimeofday(&nowtime,0); // system time now
229-
I tousec=nowtime.tv_usec+200000;
230-
struct timespec endtime={nowtime.tv_usec+(tousec>=1000000),tousec-1000000*(tousec>=1000000)}; // system time when we give up. The struct says it uses nsec but it seems to use usec
231-
pthread_cond_timedwait(&((PYXBLOK*)AAV0(pyx))->pyxwb.cond,&((PYXBLOK*)AAV0(pyx))->pyxwb.mutex,&endtime);
232-
}
233-
pthread_mutex_unlock(&((PYXBLOK*)AAV0(pyx))->pyxwb.mutex);
229+
if(unlikely(adbreak&0xff))ASSERT(0,adbreak&0xff); // JBREAK: fail the pyx and exit
230+
ASSERT(-1!=(ns=jtmdif(end)),EVTIME); // update timeout; potentially fail the pyx and exit
234231
}
235-
// res now contains the certified value of the pyx.
236-
if(likely(res!=0))R res; // valid value, use it
237-
ASSERT(0,errcode) // if error, return the error code
238-
}
232+
done:
233+
if(likely(!!((PYXBLOK*)AAV0(pyx))->pyxvalue))R ((PYXBLOK*)AAV0(pyx))->pyxvalue; // valid value, use it
234+
ASSERT(0,((PYXBLOK*)AAV0(pyx))->errcode);} // if error, return the error code
239235

240236
// ************************************* Locks **************************************
241237
// take a readlock on *alock. We come here only if a writelock was requested or running. We have incremented the readlock
@@ -642,8 +638,9 @@ F2(jttcapdot2){A z;
642638
break;}
643639
case 5: { // create a user pyx. y is the timeout in seconds
644640
#if PYXES
645-
ASSERT(AN(w)==1,EVLENGTH) w=cvt(FL,w); D *atimeout=DAV(w); atimeout=*atimeout==0?&inf:atimeout; // get the timeout value. If 0, use infinity
646-
z=box(jtcreatepyx(jt,THREADID(jt),*atimeout)); // create the recursive pyx, owned by this thread
641+
ASSERT(AN(w)==1,EVLENGTH) w=cvt(FL,w); D atimeout=*DAV(w); // get the timeout value
642+
ASSERT(atimeout==inf||atimeout<=9e9,EVLIMIT); // 9e9 is approx 63 bits of ns. This leaves ~300y; should be ok
643+
z=box(jtcreatepyx(jt,THREADID(jt),atimeout)); // create the recursive pyx, owned by this thread
647644
#else
648645
ASSERT(0,EVNONCE)
649646
#endif
@@ -737,6 +734,7 @@ ASSERT(0,EVNONCE)
737734
ASSERT(lockrc<=0,lockrc); // positive is a hard failure
738735
lockfail=lockrc==-1; // -1 is a soft failure
739736
}else{
737+
ASSERT(timeout<=9e9,EVLIMIT); // 9e9 is approx 63 bits of ns. This leaves ~300y; should be ok
740738
I lockrc=jtpthread_mutex_timedlock(jt,(jtpthread_mutex_t*)IAV0(mutex),1e9*timeout,1+THREADID(jt));
741739
ASSERT(lockrc<=0,lockrc); // positive is a hard failure
742740
lockfail=lockrc==-1; // -1 is a soft failure

jsrc/j.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,15 +406,18 @@ extern unsigned int __cdecl _clearfp (void);
406406
#endif
407407

408408
#if SY_WIN32
409-
struct jtimeval { long tv_sec, tv_usec; };
409+
struct jtimespec { long long tv_sec, tv_nsec; };
410+
struct jtimeval { long long tv_sec, tv_usec; };
410411
struct jtimezone { int tz_minuteswest, tz_dsttime; };
411412
int jgettimeofday(struct jtimeval*, struct jtimezone*);
412413
#else
413414
#include <sys/time.h>
415+
#define jtimespec timespec
414416
#define jtimeval timeval
415417
#define jtimezone timezone
416418
#define jgettimeofday gettimeofday
417419
#endif
420+
struct jtimespec jmtclk(void); //monotonic clock. Intended rel->abs conversions when sleeping; has poor granularity and slow on windows
418421

419422
#if SY_64
420423
#if defined(MMSC_VER) // SY_WIN32

jsrc/mt.c

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,33 @@
1-
// jtpthread_mutex*: mutex implementation for macos/ios/..
2-
// loosly modeled after apple pthreads version 486.100.11 and ulrich drepper 'futexes are tricky', plus recursive mutexes
3-
// needed for T. because the mainline version does not support pthread_mutex_timedlock; glibc/pthreads4w are ok
4-
// _does not_ support condition variables (yet); so, use jtpthread_mutex_t if you need timedwait, but pthread_mutex_t if you need condition variables
5-
// _not_ signal-safe
1+
// concurrency primitives, including mutexes
2+
// a raft of reasons for this:
3+
// - return EINTR when interrupted
4+
// mutexes:
5+
// - fast timedlock and recursive mutexes on macos
6+
// - robust by default
7+
// - various edge cases like lock on one thread/release on another, or acquire a lock you already hold, are UB in posix!
8+
// - mutex requisition can be associated with a task, rather than a thread
9+
// condvars:
10+
// - don't reacquire mutex on wake (_very_ slow)
11+
// - wake n on linux
12+
// novel primitives, faster than would be possible with pthreads
13+
// - mutex tokens
14+
// - queue
15+
// - pyx
16+
17+
// mutexs are loosely modeled after ulrich drepper 'futexes are tricky'
618

719
#include"j.h"
820

21+
struct jtimespec jtmtil(UI ns){
22+
struct jtimespec r=jmtclk();
23+
r.tv_sec+=ns/1000000000;r.tv_nsec+=ns%1000000000;
24+
if(r.tv_nsec>=1000000000){r.tv_sec++;r.tv_nsec-=1000000000;}
25+
R r;}
26+
I jtmdif(struct jtimespec w){
27+
struct jtimespec t=jmtclk();
28+
if(t.tv_sec>w.tv_sec||t.tv_sec==w.tv_sec&&t.tv_nsec>=w.tv_nsec)R -1;
29+
R (w.tv_sec-t.tv_sec)*1000000000+w.tv_nsec-t.tv_nsec;}
30+
931
#if defined(__APPLE__) || defined(__linux__)
1032
enum{FREE=0,LOCK=1,WAIT=2};//values for mutex->v
1133
//todo consider storing owner in the high bits of v. apple pthreads does this. But it means we can't use xadd to unlock. On the other hand, apple is mostly arm now, which doesn't have xadd anyway.
@@ -15,16 +37,17 @@ enum{FREE=0,LOCK=1,WAIT=2};//values for mutex->v
1537

1638
// there is a flaw. If t0 holds lock, t1 attempts to acquire it; when it eventually does, it will leave WAIT in v instead of LOCK,
1739

18-
// todo figure out ULF_WAIT_CANCEL_POINT (I think it allows implementing the desired behaviour for EVATTN)
40+
// todo what is ULF_WAIT_CANCEL_POINT?
1941

2042
void jtpthread_mutex_init(jtpthread_mutex_t *m,B recursive){*m=(jtpthread_mutex_t){.recursive=recursive};}
2143
C jtpthread_mutex_lock(J jt,jtpthread_mutex_t *m,I self){
2244
if(uncommon(m->owner==self)){if(unlikely(!m->recursive))R EVCONCURRENCY; m->ct++;R 0;}
23-
UI4 e;if(likely((!(e=lda(&m->v)))&&((e=FREE),casa(&m->v,&e,LOCK)))){m->ct+=m->recursive;m->owner=self;R 0;} //success. test-and-test-and-set is from glibc, mildly optimises the case when many threads swarm a locked mutex
45+
UI4 e;if(likely((!(e=lda(&m->v)))&&((e=FREE),casa(&m->v,&e,LOCK))))goto success; //fast path. test-and-test-and-set is from glibc, mildly optimises the case when many threads swarm a locked mutex. Not sure if this is for the best, but after waffling for a bit I think it is
2446
if(e!=WAIT)e=xchga(&m->v,WAIT); //penalise the multi-waiters case, since it's slower anyway
2547
while(e!=FREE){
2648
#if __linux__
27-
I i=_jfutex_waitn(&m->v,WAIT,(UI)-1); //bug? jfutex_wait doesn't get interrupted by signals on linux
49+
I i=_jfutex_waitn(&m->v,WAIT,(UI)-1);
50+
//bug? futex wait doesn't get interrupted by signals on linux if timeout is null
2851
#else
2952
I i=jfutex_wait(&m->v,WAIT);
3053
#endif
@@ -34,12 +57,11 @@ C jtpthread_mutex_lock(J jt,jtpthread_mutex_t *m,I self){
3457
else if(i==-ENOMEM)R EVWSFULL;//lol
3558
else R EVFACE;}
3659
e=xchga(&m->v,WAIT);} //exit when e==FREE; i.e., _we_ successfully installed WAIT in place of FREE
37-
m->ct+=m->recursive;m->owner=self; R 0;}
60+
success:m->ct+=m->recursive;m->owner=self; R 0;}
3861
I jtpthread_mutex_timedlock(J jt,jtpthread_mutex_t *m,UI ns,I self){
3962
if(uncommon(m->owner==self)){if(unlikely(!m->recursive))R EVCONCURRENCY; m->ct++;R 0;}
40-
UI4 e=0;if((e=lda(&m->v))!=FREE&&((e=FREE),casa(&m->v,&e,LOCK))){m->ct+=m->recursive;m->owner=self;R 0;} //success
41-
struct timespec tgt,now;if(clock_gettime(CLOCK_MONOTONIC,&now))R EVFACE;
42-
tgt.tv_sec=now.tv_sec+ns/1000000000;tgt.tv_nsec=now.tv_nsec+ns%1000000000;if(tgt.tv_nsec>=1000000000){tgt.tv_nsec-=1000000000;tgt.tv_sec++;};
63+
UI4 e=0;if((e=lda(&m->v))!=FREE&&((e=FREE),casa(&m->v,&e,LOCK)))goto success;
64+
struct timespec tgt=jtmtil(ns);
4365
if(common(e!=WAIT)){e=xchga(&m->v,WAIT);if(e==FREE)goto success;} //penalise the multi-waiters case, since it's slower anyway
4466
while(1){
4567
I i=_jfutex_waitn(&m->v,WAIT,ns);
@@ -52,11 +74,7 @@ I jtpthread_mutex_timedlock(J jt,jtpthread_mutex_t *m,UI ns,I self){
5274
e=xchga(&m->v,WAIT);
5375
if(e==FREE)goto success; //exit when e==FREE; i.e., _we_ successfully installed WAIT in place of FREE
5476
if(i==-ETIMEDOUT)R -1; //if the kernel says we timed out, trust it rather than doing another syscall to check the time
55-
clock_gettime(CLOCK_MONOTONIC,&now);
56-
if(now.tv_sec>=tgt.tv_sec || now.tv_sec==tgt.tv_sec&&now.tv_nsec>=tgt.tv_nsec)R -1;//timed out
57-
ns=1000000000*(tgt.tv_sec-now.tv_sec);
58-
if(now.tv_nsec<=tgt.tv_nsec)ns+=tgt.tv_nsec-now.tv_nsec;
59-
else ns+=1000000000-(now.tv_nsec-tgt.tv_nsec);}
77+
if(-1==(ns=jtmdif(tgt)))R -1;} //update delta, abort if timed out
6078
success:m->ct+=m->recursive;m->owner=self; R 0;}
6179
I jtpthread_mutex_trylock(jtpthread_mutex_t *m,I self){
6280
if(uncommon(m->recursive)&&m->owner){if(m->owner!=self)R -1; m->ct++;R 0;}

jsrc/mt.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
// jtpthread_mutex*: mutex implementation for macos/ios/..
1+
// mt.h: mutex, sync, timing related interfaces
22
// see mt.c
33

44
#if PYXES
5+
struct jtimespec jtmtil(UI ns); //returns a time ns ns in the future
6+
I jtmdif(struct jtimespec when); //returns the time in ns between now and when. If when is not in the future, the result will be -1
7+
//both of these are implemented in terms of mtclk and use its clock
8+
59
#if !defined(__APPLE__) && !defined(__linux__)
610
#include <pthread.h>
711
typedef pthread_mutex_t jtpthread_mutex_t;
@@ -56,11 +60,12 @@ C jtpthread_mutex_lock(J jt,jtpthread_mutex_t *m,I self);
5660
I jtpthread_mutex_timedlock(J jt,jtpthread_mutex_t*,UI ns,I self); //absolute timers suck; correct the interface. -1=failure; 0=success; positive=error
5761
I jtpthread_mutex_trylock(jtpthread_mutex_t*,I self); //0=success -1=failure positive=error
5862
C jtpthread_mutex_unlock(jtpthread_mutex_t*,I self); //0 or error code
59-
6063
//note: self must be non-zero
64+
6165
#if defined(__linux__)
6266
#include <linux/futex.h>
6367
#include <sys/syscall.h>
68+
//glibc 'syscall': stupid errno
6469
static inline void jfutex_wake1(UI4 *p){
6570
__asm__ volatile("syscall" :: "a" (SYS_futex), //eax: syscall#
6671
"D" (p), //rdi: ptr
@@ -77,17 +82,17 @@ static inline int jfutex_wait(UI4 *p,UI4 v){
7782
: "a" (SYS_futex), //eax: syscall#
7883
"D" (p), //rdi: ptr
7984
"S" (FUTEX_WAIT), //rsi: op
80-
"d" (v), //rdx: val, espected
85+
"d" (v), //rdx: espected
8186
"r" (pts)); //r10: timeout (null=no timeout)
82-
return r;}
87+
R r;}
8388
static inline int _jfutex_waitn(UI4 *p,UI4 v,UI ns){
8489
struct timespec ts={.tv_sec=ns/1000000000, .tv_nsec=ns%1000000000};
8590
register struct timespec *pts asm("r10") = &ts;
8691
int r;__asm__ volatile("syscall" : "=a"(r) //result in rax
8792
: "a" (SYS_futex), //eax: syscall#
8893
"D" (p), //rdi: ptr
8994
"S" (FUTEX_WAIT), //rsi: op
90-
"d" (v), //rdx: val, espected
95+
"d" (v), //rdx: espected
9196
"r" (pts)); //r10: timeout (relative!)
9297
R r;}
9398
#elif defined(__APPLE__)

0 commit comments

Comments
 (0)