all repos — underbbs @ 7a2eb99eb6d60d23c034f6255d5b52aea08e24d5

decentralized social media client

server/server.go (raw)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package server

import (
	"context"
	"errors"
	"fmt"
	"forge.lightcrystal.systems/lightcrystal/underbbs/adapter"
	"forge.lightcrystal.systems/lightcrystal/underbbs/models"
	"golang.org/x/time/rate"
	"hacklab.nilfm.cc/quartzgun/cookie"
	"hacklab.nilfm.cc/quartzgun/renderer"
	"io/ioutil"
	"log"
	"net/http"
	"nhooyr.io/websocket"
	"sync"
	"time"
)

type Subscriber struct {
	key       string
	msgs      chan []byte
	data      chan models.SocketData
	closeSlow func()
}

type BBSServer struct {
	subscribeMessageBuffer int
	publishLimiter         *rate.Limiter
	logf                   func(f string, v ...interface{})
	serveMux               http.ServeMux
	subscribersLock        sync.Mutex
	subscribers            map[*Subscriber][]adapter.Adapter
}

func New() *BBSServer {
	srvr := &BBSServer{
		subscribeMessageBuffer: 16,
		logf:                   log.Printf,
		subscribers:            make(map[*Subscriber][]adapter.Adapter),
	}

	// frontend is here
	srvr.serveMux.Handle("/app/", http.StripPrefix("/app/", renderer.Subtree("./frontend/dist")))

	// api
	srvr.serveMux.Handle("/api/", http.StripPrefix("/api", srvr.apiMux()))

	// websocket
	srvr.serveMux.HandleFunc("/subscribe", srvr.subscribeHandler)
	// publish is unused currently, we just use the API and send data back on the websocket
	// srvr.serveMux.HandleFunc("/publish", srvr.publishHandler)

	return srvr
}

func (self *BBSServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	self.serveMux.ServeHTTP(w, r)
}

func (self *BBSServer) subscribeHandler(w http.ResponseWriter, r *http.Request) {

	c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
		Subprotocols: []string{},
	})
	if err != nil {
		self.logf("%v", err)
		return
	}

	ctx := r.Context()
	ctx = c.CloseRead(ctx)

	s := &Subscriber{
		key:  cookie.GenToken(64),
		msgs: make(chan []byte, self.subscribeMessageBuffer),
		data: make(chan models.SocketData),
		closeSlow: func() {
			c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
		},
	}

	// start with an empty set of adapters
	// we'll configure them separately
	adapters := make([]adapter.Adapter, 0, 4)
	self.addSubscriber(s, adapters)

	// defer cleanup and write messages in the background
	defer self.deleteSubscriber(s)
	defer c.Close(websocket.StatusInternalError, "")

	go func() {
		fmt.Println("waiting for data on the subscriber's channel")
		for {
			select {
			case msg := <-s.msgs:
				writeTimeout(ctx, time.Second*5, c, msg)

			case <-ctx.Done():
				fmt.Println("subscriber has disconnected")
				close(s.data)
				return //ctx.Err()
			}
		}
	}()

	// give user their key
	s.msgs <- []byte("{ \"key\":\"" + s.key + "\" }")

	// block on the data channel, serializing and passing the data to the subscriber
	listen([]chan models.SocketData{s.data}, s.msgs)

	fmt.Println("data listener is done!")

	if errors.Is(err, context.Canceled) {
		return
	}
	if websocket.CloseStatus(err) == websocket.StatusNormalClosure ||
		websocket.CloseStatus(err) == websocket.StatusGoingAway {
		return
	}
	if err != nil {
		self.logf("%v", err)
		return
	}
}

func listen(channels []chan models.SocketData, out chan []byte) {
	var wg sync.WaitGroup
	for _, ch := range channels {
		wg.Add(1)
		go func(ch <-chan models.SocketData) {
			defer wg.Done()
			for data := range ch {
				out <- data.ToDatagram()
			}
		}(ch)

	}
	wg.Wait()
	close(out)
}

func (self *BBSServer) publishHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
		return
	}
	body := http.MaxBytesReader(w, r.Body, 8192)
	msg, err := ioutil.ReadAll(body)
	if err != nil {
		http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
		return
	}

	self.publish(msg)

	w.WriteHeader(http.StatusAccepted)
}

func (self *BBSServer) publish(msg []byte) {
	self.subscribersLock.Lock()
	defer self.subscribersLock.Unlock()

	// send messages to our adapter(s)

	self.publishLimiter.Wait(context.Background())

	// send any response from the adapter(s) back to the client

	/*for s, k := range self.subscribers {
		// whatever logic to select which subscriber to send back to
	}*/

}

func (self *BBSServer) addSubscriber(s *Subscriber, k []adapter.Adapter) {
	self.subscribersLock.Lock()
	self.subscribers[s] = k
	self.subscribersLock.Unlock()
}

func (self *BBSServer) deleteSubscriber(s *Subscriber) {
	self.subscribersLock.Lock()
	delete(self.subscribers, s)
	self.subscribersLock.Unlock()
}

func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error {
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()

	return c.Write(ctx, websocket.MessageText, msg)
}