Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
henrythasler committed Apr 28, 2024
1 parent 4a8aa47 commit 4cd5f40
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 44 deletions.
2 changes: 1 addition & 1 deletion somfy/config.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"rolling_code": 12, "key": 160, "address": 1}
{"rolling_code": 7, "key": 160, "address": 1}
2 changes: 1 addition & 1 deletion somfy/lib/rfm69.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __exit__(self, exc_type, exc_value, traceback):
def debug(self, message, level=0):
"""Debug output depending on debug level."""
if self.debug_level >= level:
print message
print(message)

def read_single(self, address):
"""Read single register via spi"""
Expand Down
34 changes: 17 additions & 17 deletions somfy/sniff.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
import pigpio as gpio
from lib.rfm69 import Rfm69
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
#import matplotlib.pyplot as plt
#from mpl_toolkits.axes_grid1 import make_axes_locatable

# define pigpio GPIO-pins where RESET- and DATA-Pin of RFM69-Transceiver are connected
RESET = 24
DATA = 25

# define pigpio-host
HOST = "rfpi"
HOST = "localhost"

start_tick = 0
state = 0 # 0=Idle, 1=Frame
Expand Down Expand Up @@ -68,7 +68,7 @@ def cbf(pin, level, tick):
# start of frame mark
elif (state == 2) and (delta in range(4850-2*tolerance, 4850+2*tolerance)):
clock = int(np.average(hw_sync)/4)
print "Clock Sync:", hw_sync, clock
print("Clock Sync:", hw_sync, clock)
bits = np.empty(0, dtype=np.uint8)
state = 3
# long pulse
Expand Down Expand Up @@ -96,7 +96,7 @@ def cbf(pin, level, tick):
histogram[0] += 1

frame = np.packbits(decoded)
print "Raw: "+''.join('0x{:02X} '.format(x) for x in frame)
print("Raw: "+''.join('0x{:02X} '.format(x) for x in frame))

for i in range(frame.size-1, 0, -1):
frame[i] = frame[i] ^ frame[i-1]
Expand All @@ -106,11 +106,11 @@ def cbf(pin, level, tick):
cksum = cksum ^ frame[i] ^ (frame[i] >> 4)
cksum = cksum & 0x0f

print "Frame: "+''.join('0x{:02X} '.format(x) for x in frame)
print " Control: 0x{:02X}".format((frame[1] >> 4) & 0x0f)
print " Checksum: {}".format("ok" if cksum==0 else "error")
print " Address: "+''.join('{:02X} '.format(x) for x in frame[4:7])
print " Rolling Code: "+''.join('{:02X} '.format(x) for x in frame[2:4])
print("Frame: "+''.join('0x{:02X} '.format(x) for x in frame))
print(" Control: 0x{:02X}".format((frame[1] >> 4) & 0x0f))
print(" Checksum: {}".format("ok" if cksum==0 else "error"))
print(" Address: "+''.join('{:02X} '.format(x) for x in frame[4:7]))
print(" Rolling Code: "+''.join('{:02X} '.format(x) for x in frame[2:4]))
else:
pass
bits = np.empty(0, dtype=np.uint8)
Expand All @@ -136,7 +136,7 @@ def main():
# just to make sure SPI is working
rx_data = rf.read_single(0x5A)
if rx_data != 0x55:
print "SPI Error"
print("SPI Error")

# configure
rf.write_single(0x01, 0b00000100) # OpMode: STDBY
Expand All @@ -158,7 +158,7 @@ def main():

# wait until RFM-Module is ready
while (rf.read_single(0x27) & 0x80) == 0:
print "waiting..."
print("waiting...")

# filter high frequency noise
pi.set_glitch_filter(DATA, 150)
Expand All @@ -169,7 +169,7 @@ def main():
# watch DATA pin
callback = pi.callback(DATA, gpio.EITHER_EDGE, cbf)

print "Scanning... Press Ctrl-C to abort"
print("Scanning... Press Ctrl-C to abort")
while 1:
sleep(1)

Expand All @@ -188,8 +188,8 @@ def show_histogram(matrix, normalize=1):
try:
main()
except KeyboardInterrupt:
print ""
print("")
finally:
print "done"
if histogram[0] > 0:
show_histogram(np.reshape(histogram[1], (-1, 8)), normalize=histogram[0])
print("done")
# if histogram[0] > 0:
# show_histogram(np.reshape(histogram[1], (-1, 8)), normalize=histogram[0])
50 changes: 25 additions & 25 deletions somfy/transmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
RESET = 24
DATA = 25

# define pigpio-host
HOST = "rfpi"
# define pigpio-host
HOST = "localhost"

COMMANDS={
'null': 0x00,
Expand All @@ -27,7 +27,7 @@

config=None

clock = 640
clock = 640

def main(code):
""" main function """
Expand Down Expand Up @@ -56,15 +56,11 @@ def main(code):
# update config
config["rolling_code"] += 1

# write new config
with open("config.json", "w") as f:
json.dump(config, f)

with Rfm69(host=HOST, channel=0, baudrate=32000, debug_level=0) as rf:
# just to make sure SPI is working
rx_data = rf.read_single(0x5A)
if rx_data != 0x55:
print "SPI Error"
print("SPI Error")

rf.write_single(0x01, 0b00000100) # OpMode: STDBY

Expand Down Expand Up @@ -92,7 +88,7 @@ def main(code):
pi.wave_clear()

# calculate frame-data from command-line arguments
data = pack(">BBH", config["key"] | (config["rolling_code"] & 0x0f), code << 4, config["rolling_code"])
data = pack(">BBH", config["key"] | (config["rolling_code"] & 0x0f), code << 4, config["rolling_code"])
data += pack("<I",config["address"])[:-1]
frame = np.fromstring(data, dtype=np.uint8)

Expand All @@ -101,16 +97,16 @@ def main(code):
for i in range(1,7):
cksum = cksum ^ frame[i] ^ (frame[i] >> 4)
frame[1] = frame[1] | (cksum & 0x0f)
print "Data: "+''.join('0x{:02X} '.format(x) for x in frame)
print("Data: "+''.join('0x{:02X} '.format(x) for x in frame))

# data whitening/obfuscation
for i in range(1, frame.size):
frame[i] = frame[i] ^ frame[i-1]

print "Frame: "+''.join('0x{:02X} '.format(x) for x in frame)
print("Frame: "+''.join('0x{:02X} '.format(x) for x in frame))

# how many consecutive frame repetitions
repetitions = 3
# how many consecutive frame repetitions (besides the one that is transmitted anyway); set to 0 for no repetitions
repetitions = 16 if code == COMMANDS["PROG"] else 3

# create wakeup pulse waveform
pi.wave_add_generic([gpio.pulse(1<<DATA, 0, 10000), gpio.pulse(0, 1<<DATA, 95000)])
Expand Down Expand Up @@ -145,21 +141,21 @@ def main(code):

# assemble whole frame sequence
frames = np.concatenate((
[wakeup],
[hw_sync, hw_sync],
[sw_sync],
[wakeup],
[hw_sync, hw_sync],
[sw_sync],
bits, # send at least once
[eof], # start
[eof], # start
[gap], # inter-frame gap
[255, 0], # start loop
[255, 0],
[hw_sync],
[255, 0],
[hw_sync],
[255, 1, 7, 0],
[sw_sync],
bits,
[eof],
[sw_sync],
bits,
[eof],
[gap], # inter-frame gap
[255, 1, repetitions, 0] # repeat
[255, 1, repetitions, 0] # repeat
))

# send frames
Expand All @@ -184,14 +180,18 @@ def main(code):
sleep(.005)
pi.stop()

# write new config
with open("config.json", "w") as f:
json.dump(config, f)

if __name__ == "__main__":
try:
if sys.argv[1] in COMMANDS:
main(COMMANDS[sys.argv[1]])
else:
print "Unknown command:", sys.argv[1]
print("Unknown command:", sys.argv[1])
except KeyboardInterrupt:
print "KeyboardInterrupt"
print("KeyboardInterrupt")
# just make sure we don't transmit forever
pi = gpio.pi(host=HOST)
pi.write(DATA, 0)
Expand Down
9 changes: 9 additions & 0 deletions velux/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Velux Integra KLI 310/311/312/313

## Wiring

## References

- [yannikmotzet: VELUX INTEGRA® Control](https://github.com/yannikmotzet/velux-integra-control)
- [thorio: ESPHome Velux Controller](https://github.com/thorio/esphome-projects/tree/master/velux-controller)
- [ottelo: Velux Integra Dachfenster Rollladen - ESP8266 (KLI 310)](https://ottelo.jimdofree.com/velux-integra-esp8266/)
57 changes: 57 additions & 0 deletions velux/transmitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Trigger VELUX INTEGRA remote control via GPIO"""

import sys
from time import sleep
import pigpio as gpio

# define pigpio GPIO-pins where the remote control is connected
GPIO_STOP = 2
GPIO_OPEN = 3
GPIO_CLOSE = 4

# define pigpio-host
HOST = "localhost"

# mapping of remote control command to GPIO pins
COMMANDS={
'stop': GPIO_STOP,
'open': GPIO_OPEN,
'close': GPIO_CLOSE,
}

def main(pin):
""" main function """
pi = gpio.pi(host=HOST)
if not pi.connected:
exit()

# prepare GPIO-Pin
pi.set_mode(pin, gpio.OUTPUT)
sleep(0.2)

pi.write(pin, 0)
sleep(0.2)
pi.write(pin, 1)

pi.stop()

if __name__ == "__main__":
try:
if len(sys.argv) >= 2:
cmd = sys.argv[1].lower()
if cmd in COMMANDS:
main(COMMANDS[cmd])
else:
print("Unknown command:", cmd)
else:
print("argument missing: open|close|down")
except KeyboardInterrupt:
print("KeyboardInterrupt")
# just make sure we don't transmit forever
pi = gpio.pi(host=HOST)
pi.stop()
finally:
#print("done")
pass

0 comments on commit 4cd5f40

Please sign in to comment.